datanode/heartbeat/handler/
flush_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_meta::instruction::{
18    FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use futures_util::future::BoxFuture;
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, UnexpectedSnafu};
26use crate::heartbeat::handler::HandlerContext;
27
28impl HandlerContext {
29    /// Performs the actual region flush operation.
30    async fn perform_region_flush(&self, region_id: RegionId) -> Result<(), error::Error> {
31        let request = RegionRequest::Flush(RegionFlushRequest {
32            row_group_size: None,
33        });
34        self.region_server
35            .handle_request(region_id, request)
36            .await?;
37        Ok(())
38    }
39
40    /// Handles asynchronous flush hints (fire-and-forget).
41    async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
42        let start_time = Instant::now();
43        for region_id in &region_ids {
44            let result = self.perform_region_flush(*region_id).await;
45            match result {
46                Ok(_) => {}
47                Err(error::Error::RegionNotFound { .. }) => {
48                    warn!(
49                        "Received a flush region hint from meta, but target region: {} is not found.",
50                        region_id
51                    );
52                }
53                Err(err) => {
54                    warn!("Failed to flush region: {}, error: {}", region_id, err);
55                }
56            }
57        }
58        let elapsed = start_time.elapsed();
59        debug!(
60            "Flush regions hint: {:?}, elapsed: {:?}",
61            region_ids, elapsed
62        );
63    }
64
65    /// Handles synchronous flush operations with proper error handling and replies.
66    async fn handle_flush_sync(
67        &self,
68        region_ids: Vec<RegionId>,
69        error_strategy: FlushErrorStrategy,
70    ) -> FlushRegionReply {
71        let mut results = Vec::with_capacity(region_ids.len());
72
73        for region_id in region_ids {
74            let result = self.flush_single_region_sync(region_id).await;
75
76            match &result {
77                Ok(_) => results.push((region_id, Ok(()))),
78                Err(err) => {
79                    // Convert error::Error to String for FlushRegionReply compatibility
80                    let error_string = err.to_string();
81                    results.push((region_id, Err(error_string)));
82
83                    // For fail-fast strategy, abort on first error
84                    if matches!(error_strategy, FlushErrorStrategy::FailFast) {
85                        break;
86                    }
87                }
88            }
89        }
90
91        FlushRegionReply::from_results(results)
92    }
93
94    /// Flushes a single region synchronously with proper error handling.
95    async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<(), error::Error> {
96        // Check if region is leader and writable
97        let Some(writable) = self.region_server.is_region_leader(region_id) else {
98            return Err(RegionNotFoundSnafu { region_id }.build());
99        };
100
101        if !writable {
102            return Err(RegionNotReadySnafu { region_id }.build());
103        }
104
105        // Register and execute the flush task
106        let region_server_moved = self.region_server.clone();
107        let register_result = self
108            .flush_tasks
109            .try_register(
110                region_id,
111                Box::pin(async move {
112                    region_server_moved
113                        .handle_request(
114                            region_id,
115                            RegionRequest::Flush(RegionFlushRequest {
116                                row_group_size: None,
117                            }),
118                        )
119                        .await?;
120                    Ok(())
121                }),
122            )
123            .await;
124
125        if register_result.is_busy() {
126            warn!("Another flush task is running for the region: {region_id}");
127        }
128
129        let mut watcher = register_result.into_watcher();
130        match self.flush_tasks.wait_until_finish(&mut watcher).await {
131            Ok(()) => Ok(()),
132            Err(err) => Err(UnexpectedSnafu {
133                violated: format!("Flush task failed: {err:?}"),
134            }
135            .build()),
136        }
137    }
138
139    /// Unified handler for FlushRegions with all flush semantics.
140    pub(crate) fn handle_flush_regions_instruction(
141        self,
142        flush_regions: FlushRegions,
143    ) -> BoxFuture<'static, Option<InstructionReply>> {
144        Box::pin(async move {
145            let start_time = Instant::now();
146            let strategy = flush_regions.strategy;
147            let region_ids = flush_regions.region_ids;
148            let error_strategy = flush_regions.error_strategy;
149
150            let reply = if matches!(strategy, FlushStrategy::Async) {
151                // Asynchronous hint mode: fire-and-forget, no reply expected
152                self.handle_flush_hint(region_ids).await;
153                None
154            } else {
155                // Synchronous mode: return reply with results
156                let reply = self.handle_flush_sync(region_ids, error_strategy).await;
157                Some(InstructionReply::FlushRegions(reply))
158            };
159
160            let elapsed = start_time.elapsed();
161            debug!(
162                "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
163                strategy, elapsed, reply
164            );
165
166            reply
167        })
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use std::sync::{Arc, RwLock};
174
175    use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
176    use mito2::engine::MITO_ENGINE_NAME;
177    use store_api::storage::RegionId;
178
179    use super::*;
180    use crate::tests::{mock_region_server, MockRegionEngine};
181
182    #[tokio::test]
183    async fn test_handle_flush_region_hint() {
184        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
185
186        let mock_region_server = mock_region_server();
187        let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
188        for region_id in &region_ids {
189            let flushed_region_ids_ref = flushed_region_ids.clone();
190            let (mock_engine, _) =
191                MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
192                    region_engine.handle_request_mock_fn =
193                        Some(Box::new(move |region_id, _request| {
194                            flushed_region_ids_ref.write().unwrap().push(region_id);
195                            Ok(0)
196                        }))
197                });
198            mock_region_server.register_test_region(*region_id, mock_engine);
199        }
200        let handler_context = HandlerContext::new_for_test(mock_region_server);
201
202        // Async hint mode
203        let flush_instruction = FlushRegions::async_batch(region_ids.clone());
204        let reply = handler_context
205            .clone()
206            .handle_flush_regions_instruction(flush_instruction)
207            .await;
208        assert!(reply.is_none()); // Hint mode returns no reply
209        assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
210
211        // Non-existent regions
212        flushed_region_ids.write().unwrap().clear();
213        let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
214        let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
215        let reply = handler_context
216            .handle_flush_regions_instruction(flush_instruction)
217            .await;
218        assert!(reply.is_none());
219        assert!(flushed_region_ids.read().unwrap().is_empty());
220    }
221
222    #[tokio::test]
223    async fn test_handle_flush_region_sync_single() {
224        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
225
226        let mock_region_server = mock_region_server();
227        let region_id = RegionId::new(1024, 1);
228
229        let flushed_region_ids_ref = flushed_region_ids.clone();
230        let (mock_engine, _) =
231            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
232                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
233                    flushed_region_ids_ref.write().unwrap().push(region_id);
234                    Ok(0)
235                }))
236            });
237        mock_region_server.register_test_region(region_id, mock_engine);
238        let handler_context = HandlerContext::new_for_test(mock_region_server);
239
240        let flush_instruction = FlushRegions::sync_single(region_id);
241        let reply = handler_context
242            .handle_flush_regions_instruction(flush_instruction)
243            .await;
244
245        assert!(reply.is_some());
246        if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
247            assert!(flush_reply.overall_success);
248            assert_eq!(flush_reply.results.len(), 1);
249            assert_eq!(flush_reply.results[0].0, region_id);
250            assert!(flush_reply.results[0].1.is_ok());
251        } else {
252            panic!("Expected FlushRegions reply");
253        }
254
255        assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
256    }
257
258    #[tokio::test]
259    async fn test_handle_flush_region_sync_batch_fail_fast() {
260        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
261
262        let mock_region_server = mock_region_server();
263        let region_ids = vec![
264            RegionId::new(1024, 1),
265            RegionId::new(1024, 2),
266            RegionId::new(1024, 3),
267        ];
268
269        // Register only the first region, others will fail
270        let flushed_region_ids_ref = flushed_region_ids.clone();
271        let (mock_engine, _) =
272            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
273                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
274                    flushed_region_ids_ref.write().unwrap().push(region_id);
275                    Ok(0)
276                }))
277            });
278        mock_region_server.register_test_region(region_ids[0], mock_engine);
279        let handler_context = HandlerContext::new_for_test(mock_region_server);
280
281        // Sync batch with fail-fast strategy
282        let flush_instruction =
283            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
284        let reply = handler_context
285            .handle_flush_regions_instruction(flush_instruction)
286            .await;
287
288        assert!(reply.is_some());
289        if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
290            assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
291                                                   // With fail-fast, only process regions until first failure
292            assert!(flush_reply.results.len() <= region_ids.len());
293        } else {
294            panic!("Expected FlushRegions reply");
295        }
296    }
297
298    #[tokio::test]
299    async fn test_handle_flush_region_sync_batch_try_all() {
300        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
301
302        let mock_region_server = mock_region_server();
303        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
304
305        // Register only the first region
306        let flushed_region_ids_ref = flushed_region_ids.clone();
307        let (mock_engine, _) =
308            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
309                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
310                    flushed_region_ids_ref.write().unwrap().push(region_id);
311                    Ok(0)
312                }))
313            });
314        mock_region_server.register_test_region(region_ids[0], mock_engine);
315        let handler_context = HandlerContext::new_for_test(mock_region_server);
316
317        // Sync batch with try-all strategy
318        let flush_instruction =
319            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
320        let reply = handler_context
321            .handle_flush_regions_instruction(flush_instruction)
322            .await;
323
324        assert!(reply.is_some());
325        if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
326            assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
327                                                   // With try-all, should process all regions
328            assert_eq!(flush_reply.results.len(), region_ids.len());
329            // First should succeed, second should fail
330            assert!(flush_reply.results[0].1.is_ok());
331            assert!(flush_reply.results[1].1.is_err());
332        } else {
333            panic!("Expected FlushRegions reply");
334        }
335    }
336}