common_meta/ddl/allocator/
region_routes.rs1use std::sync::Arc;
16
17use common_telemetry::debug;
18use store_api::storage::{RegionId, RegionNumber, TableId};
19
20use crate::error::Result;
21use crate::peer::PeerAllocator;
22use crate::rpc::router::{Region, RegionRoute};
23
24pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
25
26#[async_trait::async_trait]
27pub trait RegionRoutesAllocator: Send + Sync {
28 async fn allocate(
29 &self,
30 table_id: TableId,
31 regions_and_partitions: &[(RegionNumber, &str)],
32 ) -> Result<Vec<RegionRoute>>;
33}
34
35#[async_trait::async_trait]
36impl<T: PeerAllocator> RegionRoutesAllocator for T {
37 async fn allocate(
38 &self,
39 table_id: TableId,
40 regions_and_partitions: &[(RegionNumber, &str)],
41 ) -> Result<Vec<RegionRoute>> {
42 let regions = regions_and_partitions.len().max(1);
43 let peers = self.alloc(regions).await?;
44 debug!("Allocated peers {:?} for table {}", peers, table_id,);
45
46 let mut region_routes = regions_and_partitions
47 .iter()
48 .enumerate()
49 .map(|(i, (region_number, partition))| {
50 let region = Region {
51 id: RegionId::new(table_id, *region_number),
52 partition_expr: partition.to_string(),
53 ..Default::default()
54 };
55
56 let peer = peers[i % peers.len()].clone();
57
58 RegionRoute {
59 region,
60 leader_peer: Some(peer),
61 ..Default::default()
62 }
63 })
64 .collect::<Vec<_>>();
65
66 if region_routes.is_empty() {
68 region_routes.push(RegionRoute {
69 region: Region {
70 id: RegionId::new(table_id, 0),
71 ..Default::default()
72 },
73 leader_peer: Some(peers[0].clone()),
74 ..Default::default()
75 });
76 }
77
78 Ok(region_routes)
79 }
80}