common_meta/ddl/allocator/
region_routes.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_telemetry::debug;
18use store_api::storage::{RegionId, RegionNumber, TableId};
19
20use crate::error::Result;
21use crate::peer::PeerAllocator;
22use crate::rpc::router::{Region, RegionRoute};
23
24pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
25
26#[async_trait::async_trait]
27pub trait RegionRoutesAllocator: Send + Sync {
28    async fn allocate(
29        &self,
30        table_id: TableId,
31        regions_and_partitions: &[(RegionNumber, &str)],
32    ) -> Result<Vec<RegionRoute>>;
33}
34
35#[async_trait::async_trait]
36impl<T: PeerAllocator> RegionRoutesAllocator for T {
37    async fn allocate(
38        &self,
39        table_id: TableId,
40        regions_and_partitions: &[(RegionNumber, &str)],
41    ) -> Result<Vec<RegionRoute>> {
42        let regions = regions_and_partitions.len().max(1);
43        let peers = self.alloc(regions).await?;
44        debug!("Allocated peers {:?} for table {}", peers, table_id,);
45
46        let mut region_routes = regions_and_partitions
47            .iter()
48            .enumerate()
49            .map(|(i, (region_number, partition))| {
50                let region = Region {
51                    id: RegionId::new(table_id, *region_number),
52                    partition_expr: partition.to_string(),
53                    ..Default::default()
54                };
55
56                let peer = peers[i % peers.len()].clone();
57
58                RegionRoute {
59                    region,
60                    leader_peer: Some(peer),
61                    ..Default::default()
62                }
63            })
64            .collect::<Vec<_>>();
65
66        // If the table has no partitions, we need to create a default region.
67        if region_routes.is_empty() {
68            region_routes.push(RegionRoute {
69                region: Region {
70                    id: RegionId::new(table_id, 0),
71                    ..Default::default()
72                },
73                leader_peer: Some(peers[0].clone()),
74                ..Default::default()
75            });
76        }
77
78        Ok(region_routes)
79    }
80}