common_meta/ddl/allocator/
region_routes.rs1use std::sync::Arc;
16
17use common_telemetry::debug;
18use store_api::storage::{RegionId, RegionNumber, TableId};
19
20use crate::error::Result;
21use crate::peer::{PeerAllocContext, PeerAllocator};
22use crate::rpc::router::{Region, RegionRoute};
23
24pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
25
26#[async_trait::async_trait]
27pub trait RegionRoutesAllocator: Send + Sync {
28 async fn allocate(
29 &self,
30 table_id: TableId,
31 regions_and_partitions: &[(RegionNumber, &str)],
32 ctx: &PeerAllocContext,
33 ) -> Result<Vec<RegionRoute>>;
34}
35
36#[async_trait::async_trait]
37impl<T: PeerAllocator> RegionRoutesAllocator for T {
38 async fn allocate(
39 &self,
40 table_id: TableId,
41 regions_and_partitions: &[(RegionNumber, &str)],
42 ctx: &PeerAllocContext,
43 ) -> Result<Vec<RegionRoute>> {
44 let regions = regions_and_partitions.len().max(1);
45 let peers = self.alloc(regions, ctx).await?;
46 debug!("Allocated peers {:?} for table {}", peers, table_id,);
47
48 let mut region_routes = regions_and_partitions
49 .iter()
50 .enumerate()
51 .map(|(i, (region_number, partition))| {
52 let region = Region {
53 id: RegionId::new(table_id, *region_number),
54 partition_expr: partition.to_string(),
55 ..Default::default()
56 };
57
58 let peer = peers[i % peers.len()].clone();
59
60 RegionRoute {
61 region,
62 leader_peer: Some(peer),
63 ..Default::default()
64 }
65 })
66 .collect::<Vec<_>>();
67
68 if region_routes.is_empty() {
70 region_routes.push(RegionRoute {
71 region: Region {
72 id: RegionId::new(table_id, 0),
73 ..Default::default()
74 },
75 leader_peer: Some(peers[0].clone()),
76 ..Default::default()
77 });
78 }
79
80 Ok(region_routes)
81 }
82}