1use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader};
19use common_telemetry::tracing_context::TracingContext;
20use futures::future::join_all;
21use snafu::ResultExt;
22use store_api::metadata::RegionMetadata;
23use store_api::storage::{RegionId, TableId};
24
25use crate::ddl::utils::add_peer_context_if_needed;
26use crate::error::{DecodeJsonSnafu, Result};
27use crate::node_manager::NodeManagerRef;
28use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
29
30pub struct RegionMetadataLister {
32 node_manager: NodeManagerRef,
33}
34
35impl RegionMetadataLister {
36 pub fn new(node_manager: NodeManagerRef) -> Self {
38 Self { node_manager }
39 }
40
41 pub async fn list(
43 &self,
44 table_id: TableId,
45 region_routes: &[RegionRoute],
46 ) -> Result<Vec<Option<RegionMetadata>>> {
47 let region_distribution = region_distribution(region_routes);
48 let leaders = find_leaders(region_routes)
49 .into_iter()
50 .map(|p| (p.id, p))
51 .collect::<HashMap<_, _>>();
52
53 let total_num_region = region_distribution
54 .values()
55 .map(|r| r.leader_regions.len())
56 .sum::<usize>();
57
58 let mut list_metadata_tasks = Vec::with_capacity(leaders.len());
59
60 for (datanode_id, region_role_set) in region_distribution {
62 if region_role_set.leader_regions.is_empty() {
63 continue;
64 }
65 let peer = leaders.get(&datanode_id).unwrap();
67 let requester = self.node_manager.datanode(peer).await;
68 let region_ids = region_role_set
69 .leader_regions
70 .iter()
71 .map(|r| RegionId::new(table_id, *r).as_u64())
72 .collect();
73 let request = Self::build_list_metadata_request(region_ids);
74
75 let peer = peer.clone();
76 list_metadata_tasks.push(async move {
77 requester
78 .handle(request)
79 .await
80 .map_err(add_peer_context_if_needed(peer))
81 });
82 }
83
84 let results = join_all(list_metadata_tasks)
85 .await
86 .into_iter()
87 .collect::<Result<Vec<_>>>()?
88 .into_iter()
89 .map(|r| r.metadata);
90
91 let mut output = Vec::with_capacity(total_num_region);
92 for result in results {
93 let region_metadatas: Vec<Option<RegionMetadata>> =
94 serde_json::from_slice(&result).context(DecodeJsonSnafu)?;
95 output.extend(region_metadatas);
96 }
97
98 Ok(output)
99 }
100
101 fn build_list_metadata_request(region_ids: Vec<u64>) -> RegionRequest {
102 RegionRequest {
103 header: Some(RegionRequestHeader {
104 tracing_context: TracingContext::from_current_span().to_w3c(),
105 ..Default::default()
106 }),
107 body: Some(PbRegionRequest::ListMetadata(ListMetadataRequest {
108 region_ids,
109 })),
110 }
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use std::collections::HashMap;
117 use std::sync::Arc;
118
119 use api::region::RegionResponse;
120 use api::v1::meta::Peer;
121 use api::v1::region::region_request::Body;
122 use api::v1::region::RegionRequest;
123 use store_api::metadata::RegionMetadata;
124 use store_api::storage::RegionId;
125 use tokio::sync::mpsc;
126
127 use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, ListMetadataDatanodeHandler};
128 use crate::ddl::test_util::region_metadata::build_region_metadata;
129 use crate::ddl::test_util::test_column_metadatas;
130 use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
131 use crate::error::Result;
132 use crate::rpc::router::{Region, RegionRoute};
133 use crate::test_util::MockDatanodeManager;
134
135 fn assert_list_metadata_request(req: RegionRequest, expected_region_ids: &[RegionId]) {
136 let Some(Body::ListMetadata(req)) = req.body else {
137 unreachable!()
138 };
139
140 assert_eq!(req.region_ids.len(), expected_region_ids.len());
141 for region_id in expected_region_ids {
142 assert!(req.region_ids.contains(®ion_id.as_u64()));
143 }
144 }
145
146 fn empty_list_metadata_handler(_peer: Peer, request: RegionRequest) -> Result<RegionResponse> {
147 let Some(Body::ListMetadata(req)) = request.body else {
148 unreachable!()
149 };
150
151 let mut output: Vec<Option<RegionMetadata>> = Vec::with_capacity(req.region_ids.len());
152 for _region_id in req.region_ids {
153 output.push(None);
154 }
155
156 Ok(RegionResponse::from_metadata(
157 serde_json::to_vec(&output).unwrap(),
158 ))
159 }
160
161 #[tokio::test]
162 async fn test_list_request() {
163 let (tx, mut rx) = mpsc::channel(8);
164 let handler = DatanodeWatcher::new(tx).with_handler(empty_list_metadata_handler);
165 let node_manager = Arc::new(MockDatanodeManager::new(handler));
166 let lister = RegionMetadataLister::new(node_manager);
167 let region_routes = vec![
168 RegionRoute {
169 region: Region::new_test(RegionId::new(1024, 1)),
170 leader_peer: Some(Peer::empty(1)),
171 follower_peers: vec![Peer::empty(5)],
172 leader_state: None,
173 leader_down_since: None,
174 },
175 RegionRoute {
176 region: Region::new_test(RegionId::new(1024, 2)),
177 leader_peer: Some(Peer::empty(3)),
178 follower_peers: vec![Peer::empty(4)],
179 leader_state: None,
180 leader_down_since: None,
181 },
182 RegionRoute {
183 region: Region::new_test(RegionId::new(1024, 3)),
184 leader_peer: Some(Peer::empty(3)),
185 follower_peers: vec![Peer::empty(4)],
186 leader_state: None,
187 leader_down_since: None,
188 },
189 ];
190 let region_metadatas = lister.list(1024, ®ion_routes).await.unwrap();
191 assert_eq!(region_metadatas.len(), 3);
192
193 let mut requests = vec![];
194 for _ in 0..2 {
195 let (peer, request) = rx.try_recv().unwrap();
196 requests.push((peer, request));
197 }
198 rx.try_recv().unwrap_err();
199
200 let (peer, request) = requests.remove(0);
201 assert_eq!(peer.id, 1);
202 assert_list_metadata_request(request, &[RegionId::new(1024, 1)]);
203 let (peer, request) = requests.remove(0);
204 assert_eq!(peer.id, 3);
205 assert_list_metadata_request(request, &[RegionId::new(1024, 2), RegionId::new(1024, 3)]);
206 }
207
208 #[tokio::test]
209 async fn test_list_region_metadata() {
210 let region_metadata =
211 build_region_metadata(RegionId::new(1024, 1), &test_column_metadatas(&["tag_0"]));
212 let region_metadatas = HashMap::from([
213 (RegionId::new(1024, 0), None),
214 (RegionId::new(1024, 1), Some(region_metadata.clone())),
215 ]);
216 let handler = ListMetadataDatanodeHandler::new(region_metadatas);
217 let node_manager = Arc::new(MockDatanodeManager::new(handler));
218 let lister = RegionMetadataLister::new(node_manager);
219 let region_routes = vec![
220 RegionRoute {
221 region: Region::new_test(RegionId::new(1024, 0)),
222 leader_peer: Some(Peer::empty(1)),
223 follower_peers: vec![],
224 leader_state: None,
225 leader_down_since: None,
226 },
227 RegionRoute {
228 region: Region::new_test(RegionId::new(1024, 1)),
229 leader_peer: Some(Peer::empty(3)),
230 follower_peers: vec![],
231 leader_state: None,
232 leader_down_since: None,
233 },
234 ];
235 let region_metadatas = lister.list(1024, ®ion_routes).await.unwrap();
236 assert_eq!(region_metadatas.len(), 2);
237 assert_eq!(region_metadatas[0], None);
238 assert_eq!(region_metadatas[1], Some(region_metadata));
239 }
240}