datanode/heartbeat/handler/
flush_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_meta::instruction::{
18    FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31    type Instruction = FlushRegions;
32
33    async fn handle(
34        &self,
35        ctx: &HandlerContext,
36        flush_regions: FlushRegions,
37    ) -> Option<InstructionReply> {
38        let start_time = Instant::now();
39        let strategy = flush_regions.strategy;
40        let region_ids = flush_regions.region_ids;
41        let error_strategy = flush_regions.error_strategy;
42
43        let reply = if matches!(strategy, FlushStrategy::Async) {
44            // Asynchronous hint mode: fire-and-forget, no reply expected
45            ctx.handle_flush_hint(region_ids).await;
46            None
47        } else {
48            // Synchronous mode: return reply with results
49            let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
50            Some(InstructionReply::FlushRegions(reply))
51        };
52
53        let elapsed = start_time.elapsed();
54        debug!(
55            "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
56            strategy, elapsed, reply
57        );
58
59        reply
60    }
61}
62
63impl HandlerContext {
64    /// Performs the actual region flush operation.
65    async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
66        let request = RegionRequest::Flush(RegionFlushRequest {
67            row_group_size: None,
68        });
69        self.region_server
70            .handle_request(region_id, request)
71            .await?;
72        Ok(())
73    }
74
75    /// Handles asynchronous flush hints (fire-and-forget).
76    async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
77        let start_time = Instant::now();
78        for region_id in &region_ids {
79            let result = self.perform_region_flush(*region_id).await;
80            match result {
81                Ok(_) => {}
82                Err(error::Error::RegionNotFound { .. }) => {
83                    warn!(
84                        "Received a flush region hint from meta, but target region: {} is not found.",
85                        region_id
86                    );
87                }
88                Err(err) => {
89                    warn!("Failed to flush region: {}, error: {}", region_id, err);
90                }
91            }
92        }
93        let elapsed = start_time.elapsed();
94        debug!(
95            "Flush regions hint: {:?}, elapsed: {:?}",
96            region_ids, elapsed
97        );
98    }
99
100    /// Handles synchronous flush operations with proper error handling and replies.
101    async fn handle_flush_sync(
102        &self,
103        region_ids: Vec<RegionId>,
104        error_strategy: FlushErrorStrategy,
105    ) -> FlushRegionReply {
106        let mut results = Vec::with_capacity(region_ids.len());
107
108        for region_id in region_ids {
109            let result = self.flush_single_region_sync(region_id).await;
110
111            match &result {
112                Ok(_) => results.push((region_id, Ok(()))),
113                Err(err) => {
114                    // Convert error::Error to String for FlushRegionReply compatibility
115                    let error_string = err.to_string();
116                    results.push((region_id, Err(error_string)));
117
118                    // For fail-fast strategy, abort on first error
119                    if matches!(error_strategy, FlushErrorStrategy::FailFast) {
120                        break;
121                    }
122                }
123            }
124        }
125
126        FlushRegionReply::from_results(results)
127    }
128
129    /// Flushes a single region synchronously with proper error handling.
130    async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
131        // Check if region is leader and writable
132        let Some(writable) = self.region_server.is_region_leader(region_id) else {
133            return Err(RegionNotFoundSnafu { region_id }.build());
134        };
135
136        if !writable {
137            return Err(RegionNotReadySnafu { region_id }.build());
138        }
139
140        // Register and execute the flush task
141        let region_server_moved = self.region_server.clone();
142        let register_result = self
143            .flush_tasks
144            .try_register(
145                region_id,
146                Box::pin(async move {
147                    region_server_moved
148                        .handle_request(
149                            region_id,
150                            RegionRequest::Flush(RegionFlushRequest {
151                                row_group_size: None,
152                            }),
153                        )
154                        .await?;
155                    Ok(())
156                }),
157            )
158            .await;
159
160        if register_result.is_busy() {
161            warn!("Another flush task is running for the region: {region_id}");
162        }
163
164        let mut watcher = register_result.into_watcher();
165        match self.flush_tasks.wait_until_finish(&mut watcher).await {
166            Ok(()) => Ok(()),
167            Err(err) => Err(UnexpectedSnafu {
168                violated: format!("Flush task failed: {err:?}"),
169            }
170            .build()),
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use std::sync::{Arc, RwLock};
178
179    use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
180    use mito2::engine::MITO_ENGINE_NAME;
181    use store_api::storage::RegionId;
182
183    use super::*;
184    use crate::tests::{MockRegionEngine, mock_region_server};
185
186    #[tokio::test]
187    async fn test_handle_flush_region_hint() {
188        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
189
190        let mock_region_server = mock_region_server();
191        let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
192        for region_id in &region_ids {
193            let flushed_region_ids_ref = flushed_region_ids.clone();
194            let (mock_engine, _) =
195                MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
196                    region_engine.handle_request_mock_fn =
197                        Some(Box::new(move |region_id, _request| {
198                            flushed_region_ids_ref.write().unwrap().push(region_id);
199                            Ok(0)
200                        }))
201                });
202            mock_region_server.register_test_region(*region_id, mock_engine);
203        }
204        let handler_context = HandlerContext::new_for_test(mock_region_server);
205
206        // Async hint mode
207        let flush_instruction = FlushRegions::async_batch(region_ids.clone());
208        let reply = FlushRegionsHandler
209            .handle(&handler_context, flush_instruction)
210            .await;
211        assert!(reply.is_none()); // Hint mode returns no reply
212        assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
213
214        // Non-existent regions
215        flushed_region_ids.write().unwrap().clear();
216        let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
217        let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
218        let reply = FlushRegionsHandler
219            .handle(&handler_context, flush_instruction)
220            .await;
221        assert!(reply.is_none());
222        assert!(flushed_region_ids.read().unwrap().is_empty());
223    }
224
225    #[tokio::test]
226    async fn test_handle_flush_region_sync_single() {
227        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
228
229        let mock_region_server = mock_region_server();
230        let region_id = RegionId::new(1024, 1);
231
232        let flushed_region_ids_ref = flushed_region_ids.clone();
233        let (mock_engine, _) =
234            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
235                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
236                    flushed_region_ids_ref.write().unwrap().push(region_id);
237                    Ok(0)
238                }))
239            });
240        mock_region_server.register_test_region(region_id, mock_engine);
241        let handler_context = HandlerContext::new_for_test(mock_region_server);
242
243        let flush_instruction = FlushRegions::sync_single(region_id);
244        let reply = FlushRegionsHandler
245            .handle(&handler_context, flush_instruction)
246            .await;
247        let flush_reply = reply.unwrap().expect_flush_regions_reply();
248        assert!(flush_reply.overall_success);
249        assert_eq!(flush_reply.results.len(), 1);
250        assert_eq!(flush_reply.results[0].0, region_id);
251        assert!(flush_reply.results[0].1.is_ok());
252        assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
253    }
254
255    #[tokio::test]
256    async fn test_handle_flush_region_sync_batch_fail_fast() {
257        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
258
259        let mock_region_server = mock_region_server();
260        let region_ids = vec![
261            RegionId::new(1024, 1),
262            RegionId::new(1024, 2),
263            RegionId::new(1024, 3),
264        ];
265
266        // Register only the first region, others will fail
267        let flushed_region_ids_ref = flushed_region_ids.clone();
268        let (mock_engine, _) =
269            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
270                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
271                    flushed_region_ids_ref.write().unwrap().push(region_id);
272                    Ok(0)
273                }))
274            });
275        mock_region_server.register_test_region(region_ids[0], mock_engine);
276        let handler_context = HandlerContext::new_for_test(mock_region_server);
277
278        // Sync batch with fail-fast strategy
279        let flush_instruction =
280            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
281        let reply = FlushRegionsHandler
282            .handle(&handler_context, flush_instruction)
283            .await;
284        let flush_reply = reply.unwrap().expect_flush_regions_reply();
285        assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
286        // With fail-fast, only process regions until first failure
287        assert!(flush_reply.results.len() <= region_ids.len());
288    }
289
290    #[tokio::test]
291    async fn test_handle_flush_region_sync_batch_try_all() {
292        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
293
294        let mock_region_server = mock_region_server();
295        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
296
297        // Register only the first region
298        let flushed_region_ids_ref = flushed_region_ids.clone();
299        let (mock_engine, _) =
300            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
301                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
302                    flushed_region_ids_ref.write().unwrap().push(region_id);
303                    Ok(0)
304                }))
305            });
306        mock_region_server.register_test_region(region_ids[0], mock_engine);
307        let handler_context = HandlerContext::new_for_test(mock_region_server);
308
309        // Sync batch with try-all strategy
310        let flush_instruction =
311            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
312        let reply = FlushRegionsHandler
313            .handle(&handler_context, flush_instruction)
314            .await;
315        let flush_reply = reply.unwrap().expect_flush_regions_reply();
316        assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
317        // With try-all, should process all regions
318        assert_eq!(flush_reply.results.len(), region_ids.len());
319        // First should succeed, second should fail
320        assert!(flush_reply.results[0].1.is_ok());
321        assert!(flush_reply.results[1].1.is_err());
322    }
323}