datanode/heartbeat/handler/
flush_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_meta::instruction::{
18    FlushErrorStrategy, FlushRegionReply, FlushStrategy, Instruction, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31    async fn handle(
32        &self,
33        ctx: &HandlerContext,
34        instruction: Instruction,
35    ) -> Option<InstructionReply> {
36        let start_time = Instant::now();
37        let flush_regions = instruction.into_flush_regions().unwrap();
38        let strategy = flush_regions.strategy;
39        let region_ids = flush_regions.region_ids;
40        let error_strategy = flush_regions.error_strategy;
41
42        let reply = if matches!(strategy, FlushStrategy::Async) {
43            // Asynchronous hint mode: fire-and-forget, no reply expected
44            ctx.handle_flush_hint(region_ids).await;
45            None
46        } else {
47            // Synchronous mode: return reply with results
48            let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
49            Some(InstructionReply::FlushRegions(reply))
50        };
51
52        let elapsed = start_time.elapsed();
53        debug!(
54            "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
55            strategy, elapsed, reply
56        );
57
58        reply
59    }
60}
61
62impl HandlerContext {
63    /// Performs the actual region flush operation.
64    async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
65        let request = RegionRequest::Flush(RegionFlushRequest {
66            row_group_size: None,
67        });
68        self.region_server
69            .handle_request(region_id, request)
70            .await?;
71        Ok(())
72    }
73
74    /// Handles asynchronous flush hints (fire-and-forget).
75    async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
76        let start_time = Instant::now();
77        for region_id in &region_ids {
78            let result = self.perform_region_flush(*region_id).await;
79            match result {
80                Ok(_) => {}
81                Err(error::Error::RegionNotFound { .. }) => {
82                    warn!(
83                        "Received a flush region hint from meta, but target region: {} is not found.",
84                        region_id
85                    );
86                }
87                Err(err) => {
88                    warn!("Failed to flush region: {}, error: {}", region_id, err);
89                }
90            }
91        }
92        let elapsed = start_time.elapsed();
93        debug!(
94            "Flush regions hint: {:?}, elapsed: {:?}",
95            region_ids, elapsed
96        );
97    }
98
99    /// Handles synchronous flush operations with proper error handling and replies.
100    async fn handle_flush_sync(
101        &self,
102        region_ids: Vec<RegionId>,
103        error_strategy: FlushErrorStrategy,
104    ) -> FlushRegionReply {
105        let mut results = Vec::with_capacity(region_ids.len());
106
107        for region_id in region_ids {
108            let result = self.flush_single_region_sync(region_id).await;
109
110            match &result {
111                Ok(_) => results.push((region_id, Ok(()))),
112                Err(err) => {
113                    // Convert error::Error to String for FlushRegionReply compatibility
114                    let error_string = err.to_string();
115                    results.push((region_id, Err(error_string)));
116
117                    // For fail-fast strategy, abort on first error
118                    if matches!(error_strategy, FlushErrorStrategy::FailFast) {
119                        break;
120                    }
121                }
122            }
123        }
124
125        FlushRegionReply::from_results(results)
126    }
127
128    /// Flushes a single region synchronously with proper error handling.
129    async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
130        // Check if region is leader and writable
131        let Some(writable) = self.region_server.is_region_leader(region_id) else {
132            return Err(RegionNotFoundSnafu { region_id }.build());
133        };
134
135        if !writable {
136            return Err(RegionNotReadySnafu { region_id }.build());
137        }
138
139        // Register and execute the flush task
140        let region_server_moved = self.region_server.clone();
141        let register_result = self
142            .flush_tasks
143            .try_register(
144                region_id,
145                Box::pin(async move {
146                    region_server_moved
147                        .handle_request(
148                            region_id,
149                            RegionRequest::Flush(RegionFlushRequest {
150                                row_group_size: None,
151                            }),
152                        )
153                        .await?;
154                    Ok(())
155                }),
156            )
157            .await;
158
159        if register_result.is_busy() {
160            warn!("Another flush task is running for the region: {region_id}");
161        }
162
163        let mut watcher = register_result.into_watcher();
164        match self.flush_tasks.wait_until_finish(&mut watcher).await {
165            Ok(()) => Ok(()),
166            Err(err) => Err(UnexpectedSnafu {
167                violated: format!("Flush task failed: {err:?}"),
168            }
169            .build()),
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use std::sync::{Arc, RwLock};
177
178    use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
179    use mito2::engine::MITO_ENGINE_NAME;
180    use store_api::storage::RegionId;
181
182    use super::*;
183    use crate::tests::{MockRegionEngine, mock_region_server};
184
185    #[tokio::test]
186    async fn test_handle_flush_region_hint() {
187        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
188
189        let mock_region_server = mock_region_server();
190        let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
191        for region_id in &region_ids {
192            let flushed_region_ids_ref = flushed_region_ids.clone();
193            let (mock_engine, _) =
194                MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
195                    region_engine.handle_request_mock_fn =
196                        Some(Box::new(move |region_id, _request| {
197                            flushed_region_ids_ref.write().unwrap().push(region_id);
198                            Ok(0)
199                        }))
200                });
201            mock_region_server.register_test_region(*region_id, mock_engine);
202        }
203        let handler_context = HandlerContext::new_for_test(mock_region_server);
204
205        // Async hint mode
206        let flush_instruction = FlushRegions::async_batch(region_ids.clone());
207        let reply = FlushRegionsHandler
208            .handle(
209                &handler_context,
210                Instruction::FlushRegions(flush_instruction),
211            )
212            .await;
213        assert!(reply.is_none()); // Hint mode returns no reply
214        assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
215
216        // Non-existent regions
217        flushed_region_ids.write().unwrap().clear();
218        let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
219        let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
220        let reply = FlushRegionsHandler
221            .handle(
222                &handler_context,
223                Instruction::FlushRegions(flush_instruction),
224            )
225            .await;
226        assert!(reply.is_none());
227        assert!(flushed_region_ids.read().unwrap().is_empty());
228    }
229
230    #[tokio::test]
231    async fn test_handle_flush_region_sync_single() {
232        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
233
234        let mock_region_server = mock_region_server();
235        let region_id = RegionId::new(1024, 1);
236
237        let flushed_region_ids_ref = flushed_region_ids.clone();
238        let (mock_engine, _) =
239            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
240                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
241                    flushed_region_ids_ref.write().unwrap().push(region_id);
242                    Ok(0)
243                }))
244            });
245        mock_region_server.register_test_region(region_id, mock_engine);
246        let handler_context = HandlerContext::new_for_test(mock_region_server);
247
248        let flush_instruction = FlushRegions::sync_single(region_id);
249        let reply = FlushRegionsHandler
250            .handle(
251                &handler_context,
252                Instruction::FlushRegions(flush_instruction),
253            )
254            .await;
255        let flush_reply = reply.unwrap().expect_flush_regions_reply();
256        assert!(flush_reply.overall_success);
257        assert_eq!(flush_reply.results.len(), 1);
258        assert_eq!(flush_reply.results[0].0, region_id);
259        assert!(flush_reply.results[0].1.is_ok());
260        assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
261    }
262
263    #[tokio::test]
264    async fn test_handle_flush_region_sync_batch_fail_fast() {
265        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
266
267        let mock_region_server = mock_region_server();
268        let region_ids = vec![
269            RegionId::new(1024, 1),
270            RegionId::new(1024, 2),
271            RegionId::new(1024, 3),
272        ];
273
274        // Register only the first region, others will fail
275        let flushed_region_ids_ref = flushed_region_ids.clone();
276        let (mock_engine, _) =
277            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
278                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
279                    flushed_region_ids_ref.write().unwrap().push(region_id);
280                    Ok(0)
281                }))
282            });
283        mock_region_server.register_test_region(region_ids[0], mock_engine);
284        let handler_context = HandlerContext::new_for_test(mock_region_server);
285
286        // Sync batch with fail-fast strategy
287        let flush_instruction =
288            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
289        let reply = FlushRegionsHandler
290            .handle(
291                &handler_context,
292                Instruction::FlushRegions(flush_instruction),
293            )
294            .await;
295        let flush_reply = reply.unwrap().expect_flush_regions_reply();
296        assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
297        // With fail-fast, only process regions until first failure
298        assert!(flush_reply.results.len() <= region_ids.len());
299    }
300
301    #[tokio::test]
302    async fn test_handle_flush_region_sync_batch_try_all() {
303        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
304
305        let mock_region_server = mock_region_server();
306        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
307
308        // Register only the first region
309        let flushed_region_ids_ref = flushed_region_ids.clone();
310        let (mock_engine, _) =
311            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
312                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
313                    flushed_region_ids_ref.write().unwrap().push(region_id);
314                    Ok(0)
315                }))
316            });
317        mock_region_server.register_test_region(region_ids[0], mock_engine);
318        let handler_context = HandlerContext::new_for_test(mock_region_server);
319
320        // Sync batch with try-all strategy
321        let flush_instruction =
322            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
323        let reply = FlushRegionsHandler
324            .handle(
325                &handler_context,
326                Instruction::FlushRegions(flush_instruction),
327            )
328            .await;
329        let flush_reply = reply.unwrap().expect_flush_regions_reply();
330        assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
331        // With try-all, should process all regions
332        assert_eq!(flush_reply.results.len(), region_ids.len());
333        // First should succeed, second should fail
334        assert!(flush_reply.results[0].1.is_ok());
335        assert!(flush_reply.results[1].1.is_err());
336    }
337}