datanode/heartbeat/handler/
sync_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
16use common_telemetry::{error, info, warn};
17use futures::future::join_all;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21/// Handler for [SyncRegion] instruction.
22/// It syncs the region from a manifest or another region.
23#[derive(Debug, Clone, Copy, Default)]
24pub struct SyncRegionHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for SyncRegionHandler {
28    type Instruction = Vec<SyncRegion>;
29
30    /// Handles a batch of [SyncRegion] instructions.
31    async fn handle(
32        &self,
33        ctx: &HandlerContext,
34        regions: Self::Instruction,
35    ) -> Option<InstructionReply> {
36        let futures = regions
37            .into_iter()
38            .map(|sync_region| Self::handle_sync_region(ctx, sync_region));
39        let results = join_all(futures).await;
40
41        Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
42            results,
43        )))
44    }
45}
46
47impl SyncRegionHandler {
48    /// Handles a single [SyncRegion] instruction.
49    async fn handle_sync_region(
50        ctx: &HandlerContext,
51        SyncRegion { region_id, request }: SyncRegion,
52    ) -> SyncRegionReply {
53        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
54            warn!("Region: {} is not found", region_id);
55            return SyncRegionReply {
56                region_id,
57                ready: false,
58                exists: false,
59                error: None,
60            };
61        };
62
63        if !writable {
64            warn!("Region: {} is not writable", region_id);
65            return SyncRegionReply {
66                region_id,
67                ready: false,
68                exists: true,
69                error: Some("Region is not writable".into()),
70            };
71        }
72
73        match ctx.region_server.sync_region(region_id, request).await {
74            Ok(_) => {
75                info!("Successfully synced region: {}", region_id);
76                SyncRegionReply {
77                    region_id,
78                    ready: true,
79                    exists: true,
80                    error: None,
81                }
82            }
83            Err(e) => {
84                error!(e; "Failed to sync region: {}", region_id);
85                SyncRegionReply {
86                    region_id,
87                    ready: false,
88                    exists: true,
89                    error: Some(format!("{:?}", e)),
90                }
91            }
92        }
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
99    use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
100    use store_api::storage::RegionId;
101
102    use crate::heartbeat::handler::sync_region::SyncRegionHandler;
103    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
104    use crate::tests::{MockRegionEngine, mock_region_server};
105
106    #[tokio::test]
107    async fn test_handle_sync_region_not_found() {
108        let mut mock_region_server = mock_region_server();
109        let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
110        mock_region_server.register_engine(mock_engine);
111
112        let handler_context = HandlerContext::new_for_test(mock_region_server);
113        let handler = SyncRegionHandler;
114
115        let region_id = RegionId::new(1024, 1);
116        let sync_region = common_meta::instruction::SyncRegion {
117            region_id,
118            request: SyncRegionFromRequest::from_manifest(Default::default()),
119        };
120
121        let reply = handler
122            .handle(&handler_context, vec![sync_region])
123            .await
124            .unwrap()
125            .expect_sync_regions_reply();
126
127        assert_eq!(reply.len(), 1);
128        assert_eq!(reply[0].region_id, region_id);
129        assert!(!reply[0].exists);
130        assert!(!reply[0].ready);
131    }
132
133    #[tokio::test]
134    async fn test_handle_sync_region_not_writable() {
135        let mock_region_server = mock_region_server();
136        let region_id = RegionId::new(1024, 1);
137        let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
138            r.mock_role = Some(Some(RegionRole::Follower));
139        });
140        mock_region_server.register_test_region(region_id, mock_engine);
141
142        let handler_context = HandlerContext::new_for_test(mock_region_server);
143        let handler = SyncRegionHandler;
144
145        let sync_region = common_meta::instruction::SyncRegion {
146            region_id,
147            request: SyncRegionFromRequest::from_manifest(Default::default()),
148        };
149
150        let reply = handler
151            .handle(&handler_context, vec![sync_region])
152            .await
153            .unwrap()
154            .expect_sync_regions_reply();
155
156        assert_eq!(reply.len(), 1);
157        assert_eq!(reply[0].region_id, region_id);
158        assert!(reply[0].exists);
159        assert!(!reply[0].ready);
160        assert!(reply[0].error.is_some());
161    }
162
163    #[tokio::test]
164    async fn test_handle_sync_region_success() {
165        let mock_region_server = mock_region_server();
166        let region_id = RegionId::new(1024, 1);
167        let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
168            r.mock_role = Some(Some(RegionRole::Leader));
169        });
170        mock_region_server.register_test_region(region_id, mock_engine);
171
172        let handler_context = HandlerContext::new_for_test(mock_region_server);
173        let handler = SyncRegionHandler;
174
175        let sync_region = common_meta::instruction::SyncRegion {
176            region_id,
177            request: SyncRegionFromRequest::from_manifest(Default::default()),
178        };
179
180        let reply = handler
181            .handle(&handler_context, vec![sync_region])
182            .await
183            .unwrap()
184            .expect_sync_regions_reply();
185
186        assert_eq!(reply.len(), 1);
187        assert_eq!(reply[0].region_id, region_id);
188        assert!(reply[0].exists);
189        assert!(reply[0].ready);
190        assert!(reply[0].error.is_none());
191    }
192}