datanode/heartbeat/handler/
flush_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_meta::instruction::{
18    FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31    type Instruction = FlushRegions;
32
33    async fn handle(
34        &self,
35        ctx: &HandlerContext,
36        flush_regions: FlushRegions,
37    ) -> Option<InstructionReply> {
38        let start_time = Instant::now();
39        let strategy = flush_regions.strategy;
40        let region_ids = flush_regions.region_ids;
41        let error_strategy = flush_regions.error_strategy;
42
43        let reply = if matches!(strategy, FlushStrategy::Async) {
44            // Asynchronous hint mode: fire-and-forget, no reply expected
45            ctx.handle_flush_hint(region_ids).await;
46            None
47        } else {
48            // Synchronous mode: return reply with results
49            let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
50            Some(InstructionReply::FlushRegions(reply))
51        };
52
53        let elapsed = start_time.elapsed();
54        debug!(
55            "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
56            strategy, elapsed, reply
57        );
58
59        reply
60    }
61}
62
63impl HandlerContext {
64    /// Performs the actual region flush operation.
65    async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
66        let request = RegionRequest::Flush(RegionFlushRequest {
67            row_group_size: None,
68        });
69        self.region_server
70            .handle_request(region_id, request)
71            .await?;
72        Ok(())
73    }
74
75    /// Handles asynchronous flush hints (fire-and-forget).
76    async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
77        let start_time = Instant::now();
78        for region_id in &region_ids {
79            let result = self.perform_region_flush(*region_id).await;
80            match result {
81                Ok(_) => {}
82                Err(error::Error::RegionNotFound { .. }) => {
83                    warn!(
84                        "Received a flush region hint from meta, but target region: {} is not found.",
85                        region_id
86                    );
87                }
88                Err(err) => {
89                    warn!("Failed to flush region: {}, error: {}", region_id, err);
90                }
91            }
92        }
93        let elapsed = start_time.elapsed();
94        debug!(
95            "Flush regions hint: {:?}, elapsed: {:?}",
96            region_ids, elapsed
97        );
98    }
99
100    /// Handles synchronous flush operations with proper error handling and replies.
101    async fn handle_flush_sync(
102        &self,
103        region_ids: Vec<RegionId>,
104        error_strategy: FlushErrorStrategy,
105    ) -> FlushRegionReply {
106        let mut results = Vec::with_capacity(region_ids.len());
107
108        for region_id in region_ids {
109            let result = self.flush_single_region_sync(region_id).await;
110
111            match &result {
112                Ok(_) => results.push((region_id, Ok(()))),
113                Err(err) => {
114                    // Convert error::Error to String for FlushRegionReply compatibility
115                    let error_string = err.to_string();
116                    results.push((region_id, Err(error_string)));
117
118                    // For fail-fast strategy, abort on first error
119                    if matches!(error_strategy, FlushErrorStrategy::FailFast) {
120                        break;
121                    }
122                }
123            }
124        }
125
126        FlushRegionReply::from_results(results)
127    }
128
129    /// Flushes a single region synchronously with proper error handling.
130    async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
131        // Check if region is leader and writable
132        let Some(writable) = self.region_server.is_region_leader(region_id) else {
133            return Err(RegionNotFoundSnafu { region_id }.build());
134        };
135
136        if !writable {
137            return Err(RegionNotReadySnafu { region_id }.build());
138        }
139
140        // Register and execute the flush task
141        let region_server_moved = self.region_server.clone();
142        let register_result = self
143            .flush_tasks
144            .try_register(
145                region_id,
146                Box::pin(async move {
147                    region_server_moved
148                        .handle_request(
149                            region_id,
150                            RegionRequest::Flush(RegionFlushRequest {
151                                row_group_size: None,
152                            }),
153                        )
154                        .await?;
155                    Ok(())
156                }),
157            )
158            .await;
159
160        if register_result.is_busy() {
161            warn!("Another flush task is running for the region: {region_id}");
162        }
163
164        let mut watcher = register_result.into_watcher();
165        match self.flush_tasks.wait_until_finish(&mut watcher).await {
166            Ok(()) => Ok(()),
167            Err(err) => Err(UnexpectedSnafu {
168                violated: format!("Flush task failed: {err:?}"),
169            }
170            .build()),
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use std::sync::{Arc, RwLock};
178
179    use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
180    use common_meta::kv_backend::memory::MemoryKvBackend;
181    use mito2::engine::MITO_ENGINE_NAME;
182    use store_api::storage::RegionId;
183
184    use super::*;
185    use crate::tests::{MockRegionEngine, mock_region_server};
186
187    #[tokio::test]
188    async fn test_handle_flush_region_hint() {
189        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
190
191        let mock_region_server = mock_region_server();
192        let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
193        for region_id in &region_ids {
194            let flushed_region_ids_ref = flushed_region_ids.clone();
195            let (mock_engine, _) =
196                MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
197                    region_engine.handle_request_mock_fn =
198                        Some(Box::new(move |region_id, _request| {
199                            flushed_region_ids_ref.write().unwrap().push(region_id);
200                            Ok(0)
201                        }))
202                });
203            mock_region_server.register_test_region(*region_id, mock_engine);
204        }
205        let kv_backend = Arc::new(MemoryKvBackend::new());
206        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
207
208        // Async hint mode
209        let flush_instruction = FlushRegions::async_batch(region_ids.clone());
210        let reply = FlushRegionsHandler
211            .handle(&handler_context, flush_instruction)
212            .await;
213        assert!(reply.is_none()); // Hint mode returns no reply
214        assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
215
216        // Non-existent regions
217        flushed_region_ids.write().unwrap().clear();
218        let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
219        let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
220        let reply = FlushRegionsHandler
221            .handle(&handler_context, flush_instruction)
222            .await;
223        assert!(reply.is_none());
224        assert!(flushed_region_ids.read().unwrap().is_empty());
225    }
226
227    #[tokio::test]
228    async fn test_handle_flush_region_sync_single() {
229        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
230
231        let mock_region_server = mock_region_server();
232        let region_id = RegionId::new(1024, 1);
233
234        let flushed_region_ids_ref = flushed_region_ids.clone();
235        let (mock_engine, _) =
236            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
237                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
238                    flushed_region_ids_ref.write().unwrap().push(region_id);
239                    Ok(0)
240                }))
241            });
242        mock_region_server.register_test_region(region_id, mock_engine);
243        let kv_backend = Arc::new(MemoryKvBackend::new());
244        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
245
246        let flush_instruction = FlushRegions::sync_single(region_id);
247        let reply = FlushRegionsHandler
248            .handle(&handler_context, flush_instruction)
249            .await;
250        let flush_reply = reply.unwrap().expect_flush_regions_reply();
251        assert!(flush_reply.overall_success);
252        assert_eq!(flush_reply.results.len(), 1);
253        assert_eq!(flush_reply.results[0].0, region_id);
254        assert!(flush_reply.results[0].1.is_ok());
255        assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
256    }
257
258    #[tokio::test]
259    async fn test_handle_flush_region_sync_batch_fail_fast() {
260        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
261
262        let mock_region_server = mock_region_server();
263        let region_ids = vec![
264            RegionId::new(1024, 1),
265            RegionId::new(1024, 2),
266            RegionId::new(1024, 3),
267        ];
268
269        // Register only the first region, others will fail
270        let flushed_region_ids_ref = flushed_region_ids.clone();
271        let (mock_engine, _) =
272            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
273                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
274                    flushed_region_ids_ref.write().unwrap().push(region_id);
275                    Ok(0)
276                }))
277            });
278        mock_region_server.register_test_region(region_ids[0], mock_engine);
279        let kv_backend = Arc::new(MemoryKvBackend::new());
280        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
281
282        // Sync batch with fail-fast strategy
283        let flush_instruction =
284            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
285        let reply = FlushRegionsHandler
286            .handle(&handler_context, flush_instruction)
287            .await;
288        let flush_reply = reply.unwrap().expect_flush_regions_reply();
289        assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
290        // With fail-fast, only process regions until first failure
291        assert!(flush_reply.results.len() <= region_ids.len());
292    }
293
294    #[tokio::test]
295    async fn test_handle_flush_region_sync_batch_try_all() {
296        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
297
298        let mock_region_server = mock_region_server();
299        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
300
301        // Register only the first region
302        let flushed_region_ids_ref = flushed_region_ids.clone();
303        let (mock_engine, _) =
304            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
305                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
306                    flushed_region_ids_ref.write().unwrap().push(region_id);
307                    Ok(0)
308                }))
309            });
310        mock_region_server.register_test_region(region_ids[0], mock_engine);
311        let kv_backend = Arc::new(MemoryKvBackend::new());
312        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
313
314        // Sync batch with try-all strategy
315        let flush_instruction =
316            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
317        let reply = FlushRegionsHandler
318            .handle(&handler_context, flush_instruction)
319            .await;
320        let flush_reply = reply.unwrap().expect_flush_regions_reply();
321        assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
322        // With try-all, should process all regions
323        assert_eq!(flush_reply.results.len(), region_ids.len());
324        // First should succeed, second should fail
325        assert!(flush_reply.results[0].1.is_ok());
326        assert!(flush_reply.results[1].1.is_err());
327    }
328
329    #[test]
330    fn test_flush_regions_display() {
331        let region_id = RegionId::new(1024, 1);
332        let flush_regions = FlushRegions::sync_single(region_id);
333        let display = format!("{}", flush_regions);
334        assert_eq!(
335            display,
336            "FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)"
337        );
338    }
339}