Skip to main content

datanode/heartbeat/handler/
flush_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_meta::instruction::{
18    FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushReason, RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31    type Instruction = FlushRegions;
32
33    async fn handle(
34        &self,
35        ctx: &HandlerContext,
36        flush_regions: FlushRegions,
37    ) -> Option<InstructionReply> {
38        let start_time = Instant::now();
39        let strategy = flush_regions.strategy;
40        let region_ids = flush_regions.region_ids;
41        let error_strategy = flush_regions.error_strategy;
42        let reason = flush_regions.reason;
43
44        let reply = if matches!(strategy, FlushStrategy::Async) {
45            // Asynchronous hint mode: fire-and-forget, no reply expected
46            ctx.handle_flush_hint(region_ids, reason).await;
47            None
48        } else {
49            // Synchronous mode: return reply with results
50            let reply = ctx
51                .handle_flush_sync(region_ids, error_strategy, reason)
52                .await;
53            Some(InstructionReply::FlushRegions(reply))
54        };
55
56        let elapsed = start_time.elapsed();
57        debug!(
58            "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
59            strategy, elapsed, reply
60        );
61
62        reply
63    }
64}
65
66impl HandlerContext {
67    /// Performs the actual region flush operation.
68    async fn perform_region_flush(
69        &self,
70        region_id: RegionId,
71        reason: Option<RegionFlushReason>,
72    ) -> Result<()> {
73        let request = RegionRequest::Flush(RegionFlushRequest {
74            reason,
75            ..Default::default()
76        });
77        self.region_server
78            .handle_request(region_id, request)
79            .await?;
80        Ok(())
81    }
82
83    /// Handles asynchronous flush hints (fire-and-forget).
84    async fn handle_flush_hint(
85        &self,
86        region_ids: Vec<RegionId>,
87        reason: Option<RegionFlushReason>,
88    ) {
89        let start_time = Instant::now();
90        for region_id in &region_ids {
91            let result = self.perform_region_flush(*region_id, reason).await;
92            match result {
93                Ok(_) => {}
94                Err(error::Error::RegionNotFound { .. }) => {
95                    warn!(
96                        "Received a flush region hint from meta, but target region: {} is not found.",
97                        region_id
98                    );
99                }
100                Err(err) => {
101                    warn!("Failed to flush region: {}, error: {}", region_id, err);
102                }
103            }
104        }
105        let elapsed = start_time.elapsed();
106        debug!(
107            "Flush regions hint: {:?}, elapsed: {:?}",
108            region_ids, elapsed
109        );
110    }
111
112    /// Handles synchronous flush operations with proper error handling and replies.
113    async fn handle_flush_sync(
114        &self,
115        region_ids: Vec<RegionId>,
116        error_strategy: FlushErrorStrategy,
117        reason: Option<RegionFlushReason>,
118    ) -> FlushRegionReply {
119        let mut results = Vec::with_capacity(region_ids.len());
120
121        for region_id in region_ids {
122            let result = self.flush_single_region_sync(region_id, reason).await;
123
124            match &result {
125                Ok(_) => results.push((region_id, Ok(()))),
126                Err(err) => {
127                    // Convert error::Error to String for FlushRegionReply compatibility
128                    let error_string = err.to_string();
129                    results.push((region_id, Err(error_string)));
130
131                    // For fail-fast strategy, abort on first error
132                    if matches!(error_strategy, FlushErrorStrategy::FailFast) {
133                        break;
134                    }
135                }
136            }
137        }
138
139        FlushRegionReply::from_results(results)
140    }
141
142    /// Flushes a single region synchronously with proper error handling.
143    async fn flush_single_region_sync(
144        &self,
145        region_id: RegionId,
146        reason: Option<RegionFlushReason>,
147    ) -> Result<()> {
148        // Check if region is leader and writable
149        let Some(writable) = self.region_server.is_region_leader(region_id) else {
150            return Err(RegionNotFoundSnafu { region_id }.build());
151        };
152
153        if !writable {
154            return Err(RegionNotReadySnafu { region_id }.build());
155        }
156
157        // Register and execute the flush task
158        let region_server_moved = self.region_server.clone();
159        let register_result = self
160            .flush_tasks
161            .try_register(
162                region_id,
163                Box::pin(async move {
164                    region_server_moved
165                        .handle_request(
166                            region_id,
167                            RegionRequest::Flush(RegionFlushRequest {
168                                reason,
169                                ..Default::default()
170                            }),
171                        )
172                        .await?;
173                    Ok(())
174                }),
175            )
176            .await;
177
178        if register_result.is_busy() {
179            warn!("Another flush task is running for the region: {region_id}");
180        }
181
182        let mut watcher = register_result.into_watcher();
183        match self.flush_tasks.wait_until_finish(&mut watcher).await {
184            Ok(()) => Ok(()),
185            Err(err) => Err(UnexpectedSnafu {
186                violated: format!("Flush task failed: {err:?}"),
187            }
188            .build()),
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use std::sync::{Arc, RwLock};
196
197    use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
198    use common_meta::kv_backend::memory::MemoryKvBackend;
199    use mito2::engine::MITO_ENGINE_NAME;
200    use store_api::storage::RegionId;
201
202    use super::*;
203    use crate::tests::{MockRegionEngine, mock_region_server};
204
205    type FlushedRequests = Arc<RwLock<Vec<(RegionId, Option<RegionFlushReason>)>>>;
206
207    #[tokio::test]
208    async fn test_handle_flush_region_hint() {
209        let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
210
211        let mock_region_server = mock_region_server();
212        let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
213        for region_id in &region_ids {
214            let flushed_requests_ref = flushed_requests.clone();
215            let (mock_engine, _) =
216                MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
217                    region_engine.handle_request_mock_fn =
218                        Some(Box::new(move |region_id, request| {
219                            let RegionRequest::Flush(request) = request else {
220                                panic!("Expected flush request");
221                            };
222                            flushed_requests_ref
223                                .write()
224                                .unwrap()
225                                .push((region_id, request.reason));
226                            Ok(0)
227                        }))
228                });
229            mock_region_server.register_test_region(*region_id, mock_engine);
230        }
231        let kv_backend = Arc::new(MemoryKvBackend::new());
232        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
233
234        // Async hint mode
235        let flush_instruction = FlushRegions::async_batch(region_ids.clone())
236            .with_reason(RegionFlushReason::RemoteWalPrune);
237        let reply = FlushRegionsHandler
238            .handle(&handler_context, flush_instruction)
239            .await;
240        assert!(reply.is_none()); // Hint mode returns no reply
241        let expected = region_ids
242            .iter()
243            .map(|region_id| (*region_id, Some(RegionFlushReason::RemoteWalPrune)))
244            .collect::<Vec<_>>();
245        assert_eq!(*flushed_requests.read().unwrap(), expected);
246
247        // Non-existent regions
248        flushed_requests.write().unwrap().clear();
249        let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
250        let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
251        let reply = FlushRegionsHandler
252            .handle(&handler_context, flush_instruction)
253            .await;
254        assert!(reply.is_none());
255        assert!(flushed_requests.read().unwrap().is_empty());
256    }
257
258    #[tokio::test]
259    async fn test_handle_flush_region_sync_single() {
260        let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
261
262        let mock_region_server = mock_region_server();
263        let region_id = RegionId::new(1024, 1);
264
265        let flushed_requests_ref = flushed_requests.clone();
266        let (mock_engine, _) =
267            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
268                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, request| {
269                    let RegionRequest::Flush(request) = request else {
270                        panic!("Expected flush request");
271                    };
272                    flushed_requests_ref
273                        .write()
274                        .unwrap()
275                        .push((region_id, request.reason));
276                    Ok(0)
277                }))
278            });
279        mock_region_server.register_test_region(region_id, mock_engine);
280        let kv_backend = Arc::new(MemoryKvBackend::new());
281        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
282
283        let flush_instruction =
284            FlushRegions::sync_single(region_id).with_reason(RegionFlushReason::Repartition);
285        let reply = FlushRegionsHandler
286            .handle(&handler_context, flush_instruction)
287            .await;
288        let flush_reply = reply.unwrap().expect_flush_regions_reply();
289        assert!(flush_reply.overall_success);
290        assert_eq!(flush_reply.results.len(), 1);
291        assert_eq!(flush_reply.results[0].0, region_id);
292        assert!(flush_reply.results[0].1.is_ok());
293        assert_eq!(
294            *flushed_requests.read().unwrap(),
295            vec![(region_id, Some(RegionFlushReason::Repartition))]
296        );
297    }
298
299    #[tokio::test]
300    async fn test_handle_flush_region_sync_batch_fail_fast() {
301        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
302
303        let mock_region_server = mock_region_server();
304        let region_ids = vec![
305            RegionId::new(1024, 1),
306            RegionId::new(1024, 2),
307            RegionId::new(1024, 3),
308        ];
309
310        // Register only the first region, others will fail
311        let flushed_region_ids_ref = flushed_region_ids.clone();
312        let (mock_engine, _) =
313            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
314                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
315                    flushed_region_ids_ref.write().unwrap().push(region_id);
316                    Ok(0)
317                }))
318            });
319        mock_region_server.register_test_region(region_ids[0], mock_engine);
320        let kv_backend = Arc::new(MemoryKvBackend::new());
321        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
322
323        // Sync batch with fail-fast strategy
324        let flush_instruction =
325            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
326        let reply = FlushRegionsHandler
327            .handle(&handler_context, flush_instruction)
328            .await;
329        let flush_reply = reply.unwrap().expect_flush_regions_reply();
330        assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
331        // With fail-fast, only process regions until first failure
332        assert!(flush_reply.results.len() <= region_ids.len());
333    }
334
335    #[tokio::test]
336    async fn test_handle_flush_region_sync_batch_try_all() {
337        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
338
339        let mock_region_server = mock_region_server();
340        let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
341
342        // Register only the first region
343        let flushed_region_ids_ref = flushed_region_ids.clone();
344        let (mock_engine, _) =
345            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
346                region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
347                    flushed_region_ids_ref.write().unwrap().push(region_id);
348                    Ok(0)
349                }))
350            });
351        mock_region_server.register_test_region(region_ids[0], mock_engine);
352        let kv_backend = Arc::new(MemoryKvBackend::new());
353        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
354
355        // Sync batch with try-all strategy
356        let flush_instruction =
357            FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
358        let reply = FlushRegionsHandler
359            .handle(&handler_context, flush_instruction)
360            .await;
361        let flush_reply = reply.unwrap().expect_flush_regions_reply();
362        assert!(!flush_reply.overall_success); // Should fail due to one non-existent region
363        // With try-all, should process all regions
364        assert_eq!(flush_reply.results.len(), region_ids.len());
365        // First should succeed, second should fail
366        assert!(flush_reply.results[0].1.is_ok());
367        assert!(flush_reply.results[1].1.is_err());
368    }
369
370    #[test]
371    fn test_flush_regions_display() {
372        let region_id = RegionId::new(1024, 1);
373        let flush_regions = FlushRegions::sync_single(region_id);
374        let display = format!("{}", flush_regions);
375        assert_eq!(
376            display,
377            "FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast, reason=None)"
378        );
379    }
380}