common_meta/ddl/utils/
region_metadata_lister.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader};
19use common_telemetry::tracing_context::TracingContext;
20use futures::future::join_all;
21use snafu::ResultExt;
22use store_api::metadata::RegionMetadata;
23use store_api::storage::{RegionId, TableId};
24
25use crate::ddl::utils::add_peer_context_if_needed;
26use crate::error::{DecodeJsonSnafu, Result};
27use crate::node_manager::NodeManagerRef;
28use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
29
30/// Collects the region metadata from the datanodes.
31pub struct RegionMetadataLister {
32    node_manager: NodeManagerRef,
33}
34
35impl RegionMetadataLister {
36    /// Creates a new [`RegionMetadataLister`] with the given [`NodeManagerRef`].
37    pub fn new(node_manager: NodeManagerRef) -> Self {
38        Self { node_manager }
39    }
40
41    /// Collects the region metadata from the datanodes.
42    pub async fn list(
43        &self,
44        table_id: TableId,
45        region_routes: &[RegionRoute],
46    ) -> Result<Vec<Option<RegionMetadata>>> {
47        let region_distribution = region_distribution(region_routes);
48        let leaders = find_leaders(region_routes)
49            .into_iter()
50            .map(|p| (p.id, p))
51            .collect::<HashMap<_, _>>();
52
53        let total_num_region = region_distribution
54            .values()
55            .map(|r| r.leader_regions.len())
56            .sum::<usize>();
57
58        let mut list_metadata_tasks = Vec::with_capacity(leaders.len());
59
60        // Build requests.
61        for (datanode_id, region_role_set) in region_distribution {
62            if region_role_set.leader_regions.is_empty() {
63                continue;
64            }
65            // Safety: must exists.
66            let peer = leaders.get(&datanode_id).unwrap();
67            let requester = self.node_manager.datanode(peer).await;
68            let region_ids = region_role_set
69                .leader_regions
70                .iter()
71                .map(|r| RegionId::new(table_id, *r).as_u64())
72                .collect();
73            let request = Self::build_list_metadata_request(region_ids);
74
75            let peer = peer.clone();
76            list_metadata_tasks.push(async move {
77                requester
78                    .handle(request)
79                    .await
80                    .map_err(add_peer_context_if_needed(peer))
81            });
82        }
83
84        let results = join_all(list_metadata_tasks)
85            .await
86            .into_iter()
87            .collect::<Result<Vec<_>>>()?
88            .into_iter()
89            .map(|r| r.metadata);
90
91        let mut output = Vec::with_capacity(total_num_region);
92        for result in results {
93            let region_metadatas: Vec<Option<RegionMetadata>> =
94                serde_json::from_slice(&result).context(DecodeJsonSnafu)?;
95            output.extend(region_metadatas);
96        }
97
98        Ok(output)
99    }
100
101    fn build_list_metadata_request(region_ids: Vec<u64>) -> RegionRequest {
102        RegionRequest {
103            header: Some(RegionRequestHeader {
104                tracing_context: TracingContext::from_current_span().to_w3c(),
105                ..Default::default()
106            }),
107            body: Some(PbRegionRequest::ListMetadata(ListMetadataRequest {
108                region_ids,
109            })),
110        }
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use std::collections::HashMap;
117    use std::sync::Arc;
118
119    use api::region::RegionResponse;
120    use api::v1::meta::Peer;
121    use api::v1::region::region_request::Body;
122    use api::v1::region::RegionRequest;
123    use store_api::metadata::RegionMetadata;
124    use store_api::storage::RegionId;
125    use tokio::sync::mpsc;
126
127    use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, ListMetadataDatanodeHandler};
128    use crate::ddl::test_util::region_metadata::build_region_metadata;
129    use crate::ddl::test_util::test_column_metadatas;
130    use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
131    use crate::error::Result;
132    use crate::rpc::router::{Region, RegionRoute};
133    use crate::test_util::MockDatanodeManager;
134
135    fn assert_list_metadata_request(req: RegionRequest, expected_region_ids: &[RegionId]) {
136        let Some(Body::ListMetadata(req)) = req.body else {
137            unreachable!()
138        };
139
140        assert_eq!(req.region_ids.len(), expected_region_ids.len());
141        for region_id in expected_region_ids {
142            assert!(req.region_ids.contains(&region_id.as_u64()));
143        }
144    }
145
146    fn empty_list_metadata_handler(_peer: Peer, request: RegionRequest) -> Result<RegionResponse> {
147        let Some(Body::ListMetadata(req)) = request.body else {
148            unreachable!()
149        };
150
151        let mut output: Vec<Option<RegionMetadata>> = Vec::with_capacity(req.region_ids.len());
152        for _region_id in req.region_ids {
153            output.push(None);
154        }
155
156        Ok(RegionResponse::from_metadata(
157            serde_json::to_vec(&output).unwrap(),
158        ))
159    }
160
161    #[tokio::test]
162    async fn test_list_request() {
163        let (tx, mut rx) = mpsc::channel(8);
164        let handler = DatanodeWatcher::new(tx).with_handler(empty_list_metadata_handler);
165        let node_manager = Arc::new(MockDatanodeManager::new(handler));
166        let lister = RegionMetadataLister::new(node_manager);
167        let region_routes = vec![
168            RegionRoute {
169                region: Region::new_test(RegionId::new(1024, 1)),
170                leader_peer: Some(Peer::empty(1)),
171                follower_peers: vec![Peer::empty(5)],
172                leader_state: None,
173                leader_down_since: None,
174            },
175            RegionRoute {
176                region: Region::new_test(RegionId::new(1024, 2)),
177                leader_peer: Some(Peer::empty(3)),
178                follower_peers: vec![Peer::empty(4)],
179                leader_state: None,
180                leader_down_since: None,
181            },
182            RegionRoute {
183                region: Region::new_test(RegionId::new(1024, 3)),
184                leader_peer: Some(Peer::empty(3)),
185                follower_peers: vec![Peer::empty(4)],
186                leader_state: None,
187                leader_down_since: None,
188            },
189        ];
190        let region_metadatas = lister.list(1024, &region_routes).await.unwrap();
191        assert_eq!(region_metadatas.len(), 3);
192
193        let mut requests = vec![];
194        for _ in 0..2 {
195            let (peer, request) = rx.try_recv().unwrap();
196            requests.push((peer, request));
197        }
198        rx.try_recv().unwrap_err();
199
200        let (peer, request) = requests.remove(0);
201        assert_eq!(peer.id, 1);
202        assert_list_metadata_request(request, &[RegionId::new(1024, 1)]);
203        let (peer, request) = requests.remove(0);
204        assert_eq!(peer.id, 3);
205        assert_list_metadata_request(request, &[RegionId::new(1024, 2), RegionId::new(1024, 3)]);
206    }
207
208    #[tokio::test]
209    async fn test_list_region_metadata() {
210        let region_metadata =
211            build_region_metadata(RegionId::new(1024, 1), &test_column_metadatas(&["tag_0"]));
212        let region_metadatas = HashMap::from([
213            (RegionId::new(1024, 0), None),
214            (RegionId::new(1024, 1), Some(region_metadata.clone())),
215        ]);
216        let handler = ListMetadataDatanodeHandler::new(region_metadatas);
217        let node_manager = Arc::new(MockDatanodeManager::new(handler));
218        let lister = RegionMetadataLister::new(node_manager);
219        let region_routes = vec![
220            RegionRoute {
221                region: Region::new_test(RegionId::new(1024, 0)),
222                leader_peer: Some(Peer::empty(1)),
223                follower_peers: vec![],
224                leader_state: None,
225                leader_down_since: None,
226            },
227            RegionRoute {
228                region: Region::new_test(RegionId::new(1024, 1)),
229                leader_peer: Some(Peer::empty(3)),
230                follower_peers: vec![],
231                leader_state: None,
232                leader_down_since: None,
233            },
234        ];
235        let region_metadatas = lister.list(1024, &region_routes).await.unwrap();
236        assert_eq!(region_metadatas.len(), 2);
237        assert_eq!(region_metadatas[0], None);
238        assert_eq!(region_metadatas[1], Some(region_metadata));
239    }
240}