datanode/heartbeat/handler/
flush_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
16use common_telemetry::warn;
17use futures_util::future::BoxFuture;
18use store_api::region_request::{RegionFlushRequest, RegionRequest};
19use store_api::storage::RegionId;
20
21use crate::error;
22use crate::heartbeat::handler::HandlerContext;
23
24impl HandlerContext {
25    pub(crate) fn handle_flush_regions_instruction(
26        self,
27        flush_regions: FlushRegions,
28    ) -> BoxFuture<'static, Option<InstructionReply>> {
29        Box::pin(async move {
30            for region_id in flush_regions.region_ids {
31                let request = RegionRequest::Flush(RegionFlushRequest {
32                    row_group_size: None,
33                });
34                let result = self.region_server.handle_request(region_id, request).await;
35
36                match result {
37                    Ok(_) => {}
38                    Err(error::Error::RegionNotFound { .. }) => {
39                        warn!("Received a flush region instruction from meta, but target region: {region_id} is not found.");
40                    }
41                    Err(err) => {
42                        warn!(
43                            "Failed to flush region: {region_id}, error: {err}",
44                            region_id = region_id,
45                            err = err,
46                        );
47                    }
48                }
49            }
50            None
51        })
52    }
53
54    pub(crate) fn handle_flush_region_instruction(
55        self,
56        region_id: RegionId,
57    ) -> BoxFuture<'static, Option<InstructionReply>> {
58        Box::pin(async move {
59            let Some(writable) = self.region_server.is_region_leader(region_id) else {
60                return Some(InstructionReply::FlushRegion(SimpleReply {
61                    result: false,
62                    error: Some("Region is not leader".to_string()),
63                }));
64            };
65
66            if !writable {
67                return Some(InstructionReply::FlushRegion(SimpleReply {
68                    result: false,
69                    error: Some("Region is not writable".to_string()),
70                }));
71            }
72
73            let region_server_moved = self.region_server.clone();
74            let register_result = self
75                .flush_tasks
76                .try_register(
77                    region_id,
78                    Box::pin(async move {
79                        let request = RegionRequest::Flush(RegionFlushRequest {
80                            row_group_size: None,
81                        });
82                        region_server_moved
83                            .handle_request(region_id, request)
84                            .await?;
85                        Ok(())
86                    }),
87                )
88                .await;
89            if register_result.is_busy() {
90                warn!("Another flush task is running for the region: {region_id}");
91            }
92            let mut watcher = register_result.into_watcher();
93            let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
94            match result {
95                Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
96                    result: true,
97                    error: None,
98                })),
99                Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
100                    result: false,
101                    error: Some(format!("{err:?}")),
102                })),
103            }
104        })
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use std::sync::{Arc, RwLock};
111
112    use common_meta::instruction::FlushRegions;
113    use mito2::engine::MITO_ENGINE_NAME;
114    use store_api::storage::RegionId;
115
116    use super::*;
117    use crate::tests::{mock_region_server, MockRegionEngine};
118
119    #[tokio::test]
120    async fn test_handle_flush_region_instruction() {
121        let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
122
123        let mock_region_server = mock_region_server();
124        let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
125        for region_id in &region_ids {
126            let flushed_region_ids_ref = flushed_region_ids.clone();
127            let (mock_engine, _) =
128                MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
129                    region_engine.handle_request_mock_fn =
130                        Some(Box::new(move |region_id, _request| {
131                            flushed_region_ids_ref.write().unwrap().push(region_id);
132                            Ok(0)
133                        }))
134                });
135            mock_region_server.register_test_region(*region_id, mock_engine);
136        }
137        let handler_context = HandlerContext::new_for_test(mock_region_server);
138
139        let reply = handler_context
140            .clone()
141            .handle_flush_regions_instruction(FlushRegions {
142                region_ids: region_ids.clone(),
143            })
144            .await;
145        assert!(reply.is_none());
146        assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
147
148        flushed_region_ids.write().unwrap().clear();
149        let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
150        let reply = handler_context
151            .handle_flush_regions_instruction(FlushRegions {
152                region_ids: not_found_region_ids.clone(),
153            })
154            .await;
155        assert!(reply.is_none());
156        assert!(flushed_region_ids.read().unwrap().is_empty());
157    }
158}