datanode/heartbeat/handler/
sync_region.rs1use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
16use common_telemetry::{debug, error, info, warn};
17use futures::future::join_all;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21#[derive(Debug, Clone, Copy, Default)]
24pub struct SyncRegionHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for SyncRegionHandler {
28 type Instruction = Vec<SyncRegion>;
29
30 async fn handle(
32 &self,
33 ctx: &HandlerContext,
34 regions: Self::Instruction,
35 ) -> Option<InstructionReply> {
36 info!("Received sync region instructions: {:?}", regions);
37 let futures = regions
38 .into_iter()
39 .map(|sync_region| Self::handle_sync_region(ctx, sync_region));
40 let results = join_all(futures).await;
41 debug!("Sync region results: {:?}", results);
42
43 Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
44 results,
45 )))
46 }
47}
48
49impl SyncRegionHandler {
50 async fn handle_sync_region(
52 ctx: &HandlerContext,
53 SyncRegion { region_id, request }: SyncRegion,
54 ) -> SyncRegionReply {
55 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
56 warn!("Region: {} is not found", region_id);
57 return SyncRegionReply {
58 region_id,
59 ready: false,
60 exists: false,
61 error: None,
62 };
63 };
64
65 if !writable {
66 warn!("Region: {} is not writable", region_id);
67 return SyncRegionReply {
68 region_id,
69 ready: false,
70 exists: true,
71 error: Some("Region is not writable".into()),
72 };
73 }
74
75 match ctx.region_server.sync_region(region_id, request).await {
76 Ok(_) => {
77 info!("Successfully synced region: {}", region_id);
78 SyncRegionReply {
79 region_id,
80 ready: true,
81 exists: true,
82 error: None,
83 }
84 }
85 Err(e) => {
86 error!(e; "Failed to sync region: {}", region_id);
87 SyncRegionReply {
88 region_id,
89 ready: false,
90 exists: true,
91 error: Some(format!("{:?}", e)),
92 }
93 }
94 }
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use std::sync::Arc;
101
102 use common_meta::kv_backend::memory::MemoryKvBackend;
103 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
104 use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
105 use store_api::storage::RegionId;
106
107 use crate::heartbeat::handler::sync_region::SyncRegionHandler;
108 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
109 use crate::tests::{MockRegionEngine, mock_region_server};
110
111 #[tokio::test]
112 async fn test_handle_sync_region_not_found() {
113 let mut mock_region_server = mock_region_server();
114 let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
115 mock_region_server.register_engine(mock_engine);
116
117 let kv_backend = Arc::new(MemoryKvBackend::new());
118 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
119 let handler = SyncRegionHandler;
120
121 let region_id = RegionId::new(1024, 1);
122 let sync_region = common_meta::instruction::SyncRegion {
123 region_id,
124 request: SyncRegionFromRequest::from_manifest(Default::default()),
125 };
126
127 let reply = handler
128 .handle(&handler_context, vec![sync_region])
129 .await
130 .unwrap()
131 .expect_sync_regions_reply();
132
133 assert_eq!(reply.len(), 1);
134 assert_eq!(reply[0].region_id, region_id);
135 assert!(!reply[0].exists);
136 assert!(!reply[0].ready);
137 }
138
139 #[tokio::test]
140 async fn test_handle_sync_region_not_writable() {
141 let mock_region_server = mock_region_server();
142 let region_id = RegionId::new(1024, 1);
143 let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
144 r.mock_role = Some(Some(RegionRole::Follower));
145 });
146 mock_region_server.register_test_region(region_id, mock_engine);
147
148 let kv_backend = Arc::new(MemoryKvBackend::new());
149 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
150 let handler = SyncRegionHandler;
151
152 let sync_region = common_meta::instruction::SyncRegion {
153 region_id,
154 request: SyncRegionFromRequest::from_manifest(Default::default()),
155 };
156
157 let reply = handler
158 .handle(&handler_context, vec![sync_region])
159 .await
160 .unwrap()
161 .expect_sync_regions_reply();
162
163 assert_eq!(reply.len(), 1);
164 assert_eq!(reply[0].region_id, region_id);
165 assert!(reply[0].exists);
166 assert!(!reply[0].ready);
167 assert!(reply[0].error.is_some());
168 }
169
170 #[tokio::test]
171 async fn test_handle_sync_region_success() {
172 let mock_region_server = mock_region_server();
173 let region_id = RegionId::new(1024, 1);
174 let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
175 r.mock_role = Some(Some(RegionRole::Leader));
176 });
177 mock_region_server.register_test_region(region_id, mock_engine);
178
179 let kv_backend = Arc::new(MemoryKvBackend::new());
180 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
181 let handler = SyncRegionHandler;
182
183 let sync_region = common_meta::instruction::SyncRegion {
184 region_id,
185 request: SyncRegionFromRequest::from_manifest(Default::default()),
186 };
187
188 let reply = handler
189 .handle(&handler_context, vec![sync_region])
190 .await
191 .unwrap()
192 .expect_sync_regions_reply();
193
194 assert_eq!(reply.len(), 1);
195 assert_eq!(reply[0].region_id, region_id);
196 assert!(reply[0].exists);
197 assert!(reply[0].ready);
198 assert!(reply[0].error.is_none());
199 }
200}