common_meta/ddl/
table_meta.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::{debug, info};
19use snafu::ensure;
20use store_api::storage::{RegionId, RegionNumber, TableId};
21
22use crate::ddl::TableMetadata;
23use crate::error::{Result, UnsupportedSnafu};
24use crate::key::table_route::PhysicalTableRouteValue;
25use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
26use crate::rpc::ddl::CreateTableTask;
27use crate::rpc::router::{Region, RegionRoute};
28use crate::sequence::SequenceRef;
29use crate::wal_options_allocator::{WalOptionsAllocatorRef, allocate_region_wal_options};
30
31pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
32
33#[derive(Clone)]
34pub struct TableMetadataAllocator {
35    table_id_sequence: SequenceRef,
36    wal_options_allocator: WalOptionsAllocatorRef,
37    peer_allocator: PeerAllocatorRef,
38}
39
40impl TableMetadataAllocator {
41    pub fn new(
42        table_id_sequence: SequenceRef,
43        wal_options_allocator: WalOptionsAllocatorRef,
44    ) -> Self {
45        Self::with_peer_allocator(
46            table_id_sequence,
47            wal_options_allocator,
48            Arc::new(NoopPeerAllocator),
49        )
50    }
51
52    pub fn with_peer_allocator(
53        table_id_sequence: SequenceRef,
54        wal_options_allocator: WalOptionsAllocatorRef,
55        peer_allocator: PeerAllocatorRef,
56    ) -> Self {
57        Self {
58            table_id_sequence,
59            wal_options_allocator,
60            peer_allocator,
61        }
62    }
63
64    pub(crate) async fn allocate_table_id(
65        &self,
66        table_id: &Option<api::v1::TableId>,
67    ) -> Result<TableId> {
68        let table_id = if let Some(table_id) = table_id {
69            let table_id = table_id.id;
70
71            ensure!(
72                !self
73                    .table_id_sequence
74                    .min_max()
75                    .await
76                    .contains(&(table_id as u64)),
77                UnsupportedSnafu {
78                    operation: format!(
79                        "create table by id {} that is reserved in this node",
80                        table_id
81                    )
82                }
83            );
84
85            info!(
86                "Received explicitly allocated table id {}, will use it directly.",
87                table_id
88            );
89
90            table_id
91        } else {
92            self.table_id_sequence.next().await? as TableId
93        };
94        Ok(table_id)
95    }
96
97    fn create_wal_options(
98        &self,
99        table_route: &PhysicalTableRouteValue,
100        skip_wal: bool,
101    ) -> Result<HashMap<RegionNumber, String>> {
102        let region_numbers = table_route
103            .region_routes
104            .iter()
105            .map(|route| route.region.id.region_number())
106            .collect();
107        allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
108    }
109
110    async fn create_table_route(
111        &self,
112        table_id: TableId,
113        task: &CreateTableTask,
114    ) -> Result<PhysicalTableRouteValue> {
115        let regions = task.partitions.len().max(1);
116        let peers = self.peer_allocator.alloc(regions).await?;
117        debug!("Allocated peers {:?} for table {}", peers, table_id);
118
119        let mut region_routes = task
120            .partitions
121            .iter()
122            .enumerate()
123            .map(|(i, partition)| {
124                let region = Region {
125                    id: RegionId::new(table_id, i as u32),
126                    partition_expr: partition.expression.clone(),
127                    ..Default::default()
128                };
129
130                let peer = peers[i % peers.len()].clone();
131
132                RegionRoute {
133                    region,
134                    leader_peer: Some(peer),
135                    ..Default::default()
136                }
137            })
138            .collect::<Vec<_>>();
139
140        // If the table has no partitions, we need to create a default region.
141        if region_routes.is_empty() {
142            region_routes.push(RegionRoute {
143                region: Region {
144                    id: RegionId::new(table_id, 0),
145                    ..Default::default()
146                },
147                leader_peer: Some(peers[0].clone()),
148                ..Default::default()
149            });
150        }
151
152        Ok(PhysicalTableRouteValue::new(region_routes))
153    }
154
155    /// Create VIEW metadata
156    pub async fn create_view(&self, table_id: &Option<api::v1::TableId>) -> Result<TableMetadata> {
157        let table_id = self.allocate_table_id(table_id).await?;
158
159        Ok(TableMetadata {
160            table_id,
161            ..Default::default()
162        })
163    }
164
165    pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
166        let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
167        let table_route = self.create_table_route(table_id, task).await?;
168
169        let region_wal_options =
170            self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
171
172        debug!(
173            "Allocated region wal options {:?} for table {}",
174            region_wal_options, table_id
175        );
176
177        Ok(TableMetadata {
178            table_id,
179            table_route,
180            region_wal_options,
181        })
182    }
183
184    pub fn table_id_sequence(&self) -> SequenceRef {
185        self.table_id_sequence.clone()
186    }
187}