Skip to main content

common_meta/ddl/allocator/
region_routes.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_telemetry::debug;
18use store_api::storage::{RegionId, RegionNumber, TableId};
19
20use crate::error::Result;
21use crate::peer::{PeerAllocContext, PeerAllocator};
22use crate::rpc::router::{Region, RegionRoute};
23
24pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
25
26#[async_trait::async_trait]
27pub trait RegionRoutesAllocator: Send + Sync {
28    async fn allocate(
29        &self,
30        table_id: TableId,
31        regions_and_partitions: &[(RegionNumber, &str)],
32        ctx: &PeerAllocContext,
33    ) -> Result<Vec<RegionRoute>>;
34}
35
36#[async_trait::async_trait]
37impl<T: PeerAllocator> RegionRoutesAllocator for T {
38    async fn allocate(
39        &self,
40        table_id: TableId,
41        regions_and_partitions: &[(RegionNumber, &str)],
42        ctx: &PeerAllocContext,
43    ) -> Result<Vec<RegionRoute>> {
44        let regions = regions_and_partitions.len().max(1);
45        let peers = self.alloc(regions, ctx).await?;
46        debug!("Allocated peers {:?} for table {}", peers, table_id,);
47
48        let mut region_routes = regions_and_partitions
49            .iter()
50            .enumerate()
51            .map(|(i, (region_number, partition))| {
52                let region = Region {
53                    id: RegionId::new(table_id, *region_number),
54                    partition_expr: partition.to_string(),
55                    ..Default::default()
56                };
57
58                let peer = peers[i % peers.len()].clone();
59
60                RegionRoute {
61                    region,
62                    leader_peer: Some(peer),
63                    ..Default::default()
64                }
65            })
66            .collect::<Vec<_>>();
67
68        // If the table has no partitions, we need to create a default region.
69        if region_routes.is_empty() {
70            region_routes.push(RegionRoute {
71                region: Region {
72                    id: RegionId::new(table_id, 0),
73                    ..Default::default()
74                },
75                leader_peer: Some(peers[0].clone()),
76                ..Default::default()
77            });
78        }
79
80        Ok(region_routes)
81    }
82}