datanode/heartbeat/handler/
sync_region.rs1use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
16use common_telemetry::{error, info, warn};
17use futures::future::join_all;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21#[derive(Debug, Clone, Copy, Default)]
24pub struct SyncRegionHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for SyncRegionHandler {
28 type Instruction = Vec<SyncRegion>;
29
30 async fn handle(
32 &self,
33 ctx: &HandlerContext,
34 regions: Self::Instruction,
35 ) -> Option<InstructionReply> {
36 let futures = regions
37 .into_iter()
38 .map(|sync_region| Self::handle_sync_region(ctx, sync_region));
39 let results = join_all(futures).await;
40
41 Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
42 results,
43 )))
44 }
45}
46
47impl SyncRegionHandler {
48 async fn handle_sync_region(
50 ctx: &HandlerContext,
51 SyncRegion { region_id, request }: SyncRegion,
52 ) -> SyncRegionReply {
53 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
54 warn!("Region: {} is not found", region_id);
55 return SyncRegionReply {
56 region_id,
57 ready: false,
58 exists: false,
59 error: None,
60 };
61 };
62
63 if !writable {
64 warn!("Region: {} is not writable", region_id);
65 return SyncRegionReply {
66 region_id,
67 ready: false,
68 exists: true,
69 error: Some("Region is not writable".into()),
70 };
71 }
72
73 match ctx.region_server.sync_region(region_id, request).await {
74 Ok(_) => {
75 info!("Successfully synced region: {}", region_id);
76 SyncRegionReply {
77 region_id,
78 ready: true,
79 exists: true,
80 error: None,
81 }
82 }
83 Err(e) => {
84 error!(e; "Failed to sync region: {}", region_id);
85 SyncRegionReply {
86 region_id,
87 ready: false,
88 exists: true,
89 error: Some(format!("{:?}", e)),
90 }
91 }
92 }
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
99 use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
100 use store_api::storage::RegionId;
101
102 use crate::heartbeat::handler::sync_region::SyncRegionHandler;
103 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
104 use crate::tests::{MockRegionEngine, mock_region_server};
105
106 #[tokio::test]
107 async fn test_handle_sync_region_not_found() {
108 let mut mock_region_server = mock_region_server();
109 let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
110 mock_region_server.register_engine(mock_engine);
111
112 let handler_context = HandlerContext::new_for_test(mock_region_server);
113 let handler = SyncRegionHandler;
114
115 let region_id = RegionId::new(1024, 1);
116 let sync_region = common_meta::instruction::SyncRegion {
117 region_id,
118 request: SyncRegionFromRequest::from_manifest(Default::default()),
119 };
120
121 let reply = handler
122 .handle(&handler_context, vec![sync_region])
123 .await
124 .unwrap()
125 .expect_sync_regions_reply();
126
127 assert_eq!(reply.len(), 1);
128 assert_eq!(reply[0].region_id, region_id);
129 assert!(!reply[0].exists);
130 assert!(!reply[0].ready);
131 }
132
133 #[tokio::test]
134 async fn test_handle_sync_region_not_writable() {
135 let mock_region_server = mock_region_server();
136 let region_id = RegionId::new(1024, 1);
137 let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
138 r.mock_role = Some(Some(RegionRole::Follower));
139 });
140 mock_region_server.register_test_region(region_id, mock_engine);
141
142 let handler_context = HandlerContext::new_for_test(mock_region_server);
143 let handler = SyncRegionHandler;
144
145 let sync_region = common_meta::instruction::SyncRegion {
146 region_id,
147 request: SyncRegionFromRequest::from_manifest(Default::default()),
148 };
149
150 let reply = handler
151 .handle(&handler_context, vec![sync_region])
152 .await
153 .unwrap()
154 .expect_sync_regions_reply();
155
156 assert_eq!(reply.len(), 1);
157 assert_eq!(reply[0].region_id, region_id);
158 assert!(reply[0].exists);
159 assert!(!reply[0].ready);
160 assert!(reply[0].error.is_some());
161 }
162
163 #[tokio::test]
164 async fn test_handle_sync_region_success() {
165 let mock_region_server = mock_region_server();
166 let region_id = RegionId::new(1024, 1);
167 let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
168 r.mock_role = Some(Some(RegionRole::Leader));
169 });
170 mock_region_server.register_test_region(region_id, mock_engine);
171
172 let handler_context = HandlerContext::new_for_test(mock_region_server);
173 let handler = SyncRegionHandler;
174
175 let sync_region = common_meta::instruction::SyncRegion {
176 region_id,
177 request: SyncRegionFromRequest::from_manifest(Default::default()),
178 };
179
180 let reply = handler
181 .handle(&handler_context, vec![sync_region])
182 .await
183 .unwrap()
184 .expect_sync_regions_reply();
185
186 assert_eq!(reply.len(), 1);
187 assert_eq!(reply[0].region_id, region_id);
188 assert!(reply[0].exists);
189 assert!(reply[0].ready);
190 assert!(reply[0].error.is_none());
191 }
192}