datanode/heartbeat/handler/
sync_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
16use common_telemetry::{debug, error, info, warn};
17use futures::future::join_all;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21/// Handler for [SyncRegion] instruction.
22/// It syncs the region from a manifest or another region.
23#[derive(Debug, Clone, Copy, Default)]
24pub struct SyncRegionHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for SyncRegionHandler {
28    type Instruction = Vec<SyncRegion>;
29
30    /// Handles a batch of [SyncRegion] instructions.
31    async fn handle(
32        &self,
33        ctx: &HandlerContext,
34        regions: Self::Instruction,
35    ) -> Option<InstructionReply> {
36        info!("Received sync region instructions: {:?}", regions);
37        let futures = regions
38            .into_iter()
39            .map(|sync_region| Self::handle_sync_region(ctx, sync_region));
40        let results = join_all(futures).await;
41        debug!("Sync region results: {:?}", results);
42
43        Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
44            results,
45        )))
46    }
47}
48
49impl SyncRegionHandler {
50    /// Handles a single [SyncRegion] instruction.
51    async fn handle_sync_region(
52        ctx: &HandlerContext,
53        SyncRegion { region_id, request }: SyncRegion,
54    ) -> SyncRegionReply {
55        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
56            warn!("Region: {} is not found", region_id);
57            return SyncRegionReply {
58                region_id,
59                ready: false,
60                exists: false,
61                error: None,
62            };
63        };
64
65        if !writable {
66            warn!("Region: {} is not writable", region_id);
67            return SyncRegionReply {
68                region_id,
69                ready: false,
70                exists: true,
71                error: Some("Region is not writable".into()),
72            };
73        }
74
75        match ctx.region_server.sync_region(region_id, request).await {
76            Ok(_) => {
77                info!("Successfully synced region: {}", region_id);
78                SyncRegionReply {
79                    region_id,
80                    ready: true,
81                    exists: true,
82                    error: None,
83                }
84            }
85            Err(e) => {
86                error!(e; "Failed to sync region: {}", region_id);
87                SyncRegionReply {
88                    region_id,
89                    ready: false,
90                    exists: true,
91                    error: Some(format!("{:?}", e)),
92                }
93            }
94        }
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use std::sync::Arc;
101
102    use common_meta::kv_backend::memory::MemoryKvBackend;
103    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
104    use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
105    use store_api::storage::RegionId;
106
107    use crate::heartbeat::handler::sync_region::SyncRegionHandler;
108    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
109    use crate::tests::{MockRegionEngine, mock_region_server};
110
111    #[tokio::test]
112    async fn test_handle_sync_region_not_found() {
113        let mut mock_region_server = mock_region_server();
114        let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
115        mock_region_server.register_engine(mock_engine);
116
117        let kv_backend = Arc::new(MemoryKvBackend::new());
118        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
119        let handler = SyncRegionHandler;
120
121        let region_id = RegionId::new(1024, 1);
122        let sync_region = common_meta::instruction::SyncRegion {
123            region_id,
124            request: SyncRegionFromRequest::from_manifest(Default::default()),
125        };
126
127        let reply = handler
128            .handle(&handler_context, vec![sync_region])
129            .await
130            .unwrap()
131            .expect_sync_regions_reply();
132
133        assert_eq!(reply.len(), 1);
134        assert_eq!(reply[0].region_id, region_id);
135        assert!(!reply[0].exists);
136        assert!(!reply[0].ready);
137    }
138
139    #[tokio::test]
140    async fn test_handle_sync_region_not_writable() {
141        let mock_region_server = mock_region_server();
142        let region_id = RegionId::new(1024, 1);
143        let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
144            r.mock_role = Some(Some(RegionRole::Follower));
145        });
146        mock_region_server.register_test_region(region_id, mock_engine);
147
148        let kv_backend = Arc::new(MemoryKvBackend::new());
149        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
150        let handler = SyncRegionHandler;
151
152        let sync_region = common_meta::instruction::SyncRegion {
153            region_id,
154            request: SyncRegionFromRequest::from_manifest(Default::default()),
155        };
156
157        let reply = handler
158            .handle(&handler_context, vec![sync_region])
159            .await
160            .unwrap()
161            .expect_sync_regions_reply();
162
163        assert_eq!(reply.len(), 1);
164        assert_eq!(reply[0].region_id, region_id);
165        assert!(reply[0].exists);
166        assert!(!reply[0].ready);
167        assert!(reply[0].error.is_some());
168    }
169
170    #[tokio::test]
171    async fn test_handle_sync_region_success() {
172        let mock_region_server = mock_region_server();
173        let region_id = RegionId::new(1024, 1);
174        let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
175            r.mock_role = Some(Some(RegionRole::Leader));
176        });
177        mock_region_server.register_test_region(region_id, mock_engine);
178
179        let kv_backend = Arc::new(MemoryKvBackend::new());
180        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
181        let handler = SyncRegionHandler;
182
183        let sync_region = common_meta::instruction::SyncRegion {
184            region_id,
185            request: SyncRegionFromRequest::from_manifest(Default::default()),
186        };
187
188        let reply = handler
189            .handle(&handler_context, vec![sync_region])
190            .await
191            .unwrap()
192            .expect_sync_regions_reply();
193
194        assert_eq!(reply.len(), 1);
195        assert_eq!(reply[0].region_id, region_id);
196        assert!(reply[0].exists);
197        assert!(reply[0].ready);
198        assert!(reply[0].error.is_none());
199    }
200}