common_meta/ddl/
table_meta.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_telemetry::{debug, info};
20use snafu::ensure;
21use store_api::storage::{RegionId, RegionNumber, TableId};
22
23use crate::ddl::TableMetadata;
24use crate::error::{Result, UnsupportedSnafu};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::peer::Peer;
27use crate::rpc::ddl::CreateTableTask;
28use crate::rpc::router::{Region, RegionRoute};
29use crate::sequence::SequenceRef;
30use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
31
32pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
33
34#[derive(Clone)]
35pub struct TableMetadataAllocator {
36    table_id_sequence: SequenceRef,
37    wal_options_allocator: WalOptionsAllocatorRef,
38    peer_allocator: PeerAllocatorRef,
39}
40
41impl TableMetadataAllocator {
42    pub fn new(
43        table_id_sequence: SequenceRef,
44        wal_options_allocator: WalOptionsAllocatorRef,
45    ) -> Self {
46        Self::with_peer_allocator(
47            table_id_sequence,
48            wal_options_allocator,
49            Arc::new(NoopPeerAllocator),
50        )
51    }
52
53    pub fn with_peer_allocator(
54        table_id_sequence: SequenceRef,
55        wal_options_allocator: WalOptionsAllocatorRef,
56        peer_allocator: PeerAllocatorRef,
57    ) -> Self {
58        Self {
59            table_id_sequence,
60            wal_options_allocator,
61            peer_allocator,
62        }
63    }
64
65    pub(crate) async fn allocate_table_id(
66        &self,
67        table_id: &Option<api::v1::TableId>,
68    ) -> Result<TableId> {
69        let table_id = if let Some(table_id) = table_id {
70            let table_id = table_id.id;
71
72            ensure!(
73                !self
74                    .table_id_sequence
75                    .min_max()
76                    .await
77                    .contains(&(table_id as u64)),
78                UnsupportedSnafu {
79                    operation: format!(
80                        "create table by id {} that is reserved in this node",
81                        table_id
82                    )
83                }
84            );
85
86            info!(
87                "Received explicitly allocated table id {}, will use it directly.",
88                table_id
89            );
90
91            table_id
92        } else {
93            self.table_id_sequence.next().await? as TableId
94        };
95        Ok(table_id)
96    }
97
98    fn create_wal_options(
99        &self,
100        table_route: &PhysicalTableRouteValue,
101        skip_wal: bool,
102    ) -> Result<HashMap<RegionNumber, String>> {
103        let region_numbers = table_route
104            .region_routes
105            .iter()
106            .map(|route| route.region.id.region_number())
107            .collect();
108        allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
109    }
110
111    async fn create_table_route(
112        &self,
113        table_id: TableId,
114        task: &CreateTableTask,
115    ) -> Result<PhysicalTableRouteValue> {
116        let regions = task.partitions.len().max(1);
117        let peers = self.peer_allocator.alloc(regions).await?;
118        debug!("Allocated peers {:?} for table {}", peers, table_id);
119
120        let mut region_routes = task
121            .partitions
122            .iter()
123            .enumerate()
124            .map(|(i, partition)| {
125                let region = Region {
126                    id: RegionId::new(table_id, i as u32),
127                    partition_expr: partition.expression.clone(),
128                    ..Default::default()
129                };
130
131                let peer = peers[i % peers.len()].clone();
132
133                RegionRoute {
134                    region,
135                    leader_peer: Some(peer),
136                    ..Default::default()
137                }
138            })
139            .collect::<Vec<_>>();
140
141        // If the table has no partitions, we need to create a default region.
142        if region_routes.is_empty() {
143            region_routes.push(RegionRoute {
144                region: Region {
145                    id: RegionId::new(table_id, 0),
146                    ..Default::default()
147                },
148                leader_peer: Some(peers[0].clone()),
149                ..Default::default()
150            });
151        }
152
153        Ok(PhysicalTableRouteValue::new(region_routes))
154    }
155
156    /// Create VIEW metadata
157    pub async fn create_view(&self, table_id: &Option<api::v1::TableId>) -> Result<TableMetadata> {
158        let table_id = self.allocate_table_id(table_id).await?;
159
160        Ok(TableMetadata {
161            table_id,
162            ..Default::default()
163        })
164    }
165
166    pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
167        let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
168        let table_route = self.create_table_route(table_id, task).await?;
169
170        let region_wal_options =
171            self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
172
173        debug!(
174            "Allocated region wal options {:?} for table {}",
175            region_wal_options, table_id
176        );
177
178        Ok(TableMetadata {
179            table_id,
180            table_route,
181            region_wal_options,
182        })
183    }
184
185    pub fn table_id_sequence(&self) -> SequenceRef {
186        self.table_id_sequence.clone()
187    }
188}
189
190pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
191
192/// [`PeerAllocator`] allocates [`Peer`]s for creating regions.
193#[async_trait]
194pub trait PeerAllocator: Send + Sync {
195    /// Allocates `regions` size [`Peer`]s.
196    async fn alloc(&self, regions: usize) -> Result<Vec<Peer>>;
197}
198
199struct NoopPeerAllocator;
200
201#[async_trait]
202impl PeerAllocator for NoopPeerAllocator {
203    async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
204        Ok(vec![Peer::default(); regions])
205    }
206}