common_meta/ddl/
table_meta.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::{debug, info};
19use snafu::ensure;
20use store_api::storage::{RegionId, RegionNumber, TableId};
21
22use crate::ddl::TableMetadata;
23use crate::error::{Result, UnsupportedSnafu};
24use crate::key::table_route::PhysicalTableRouteValue;
25use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
26use crate::rpc::ddl::CreateTableTask;
27use crate::rpc::router::{Region, RegionRoute};
28use crate::sequence::SequenceRef;
29use crate::wal_options_allocator::{WalOptionsAllocatorRef, allocate_region_wal_options};
30
31pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
32
33#[derive(Clone)]
34pub struct TableMetadataAllocator {
35 table_id_sequence: SequenceRef,
36 wal_options_allocator: WalOptionsAllocatorRef,
37 peer_allocator: PeerAllocatorRef,
38}
39
40impl TableMetadataAllocator {
41 pub fn new(
42 table_id_sequence: SequenceRef,
43 wal_options_allocator: WalOptionsAllocatorRef,
44 ) -> Self {
45 Self::with_peer_allocator(
46 table_id_sequence,
47 wal_options_allocator,
48 Arc::new(NoopPeerAllocator),
49 )
50 }
51
52 pub fn with_peer_allocator(
53 table_id_sequence: SequenceRef,
54 wal_options_allocator: WalOptionsAllocatorRef,
55 peer_allocator: PeerAllocatorRef,
56 ) -> Self {
57 Self {
58 table_id_sequence,
59 wal_options_allocator,
60 peer_allocator,
61 }
62 }
63
64 pub(crate) async fn allocate_table_id(
65 &self,
66 table_id: &Option<api::v1::TableId>,
67 ) -> Result<TableId> {
68 let table_id = if let Some(table_id) = table_id {
69 let table_id = table_id.id;
70
71 ensure!(
72 !self
73 .table_id_sequence
74 .min_max()
75 .await
76 .contains(&(table_id as u64)),
77 UnsupportedSnafu {
78 operation: format!(
79 "create table by id {} that is reserved in this node",
80 table_id
81 )
82 }
83 );
84
85 info!(
86 "Received explicitly allocated table id {}, will use it directly.",
87 table_id
88 );
89
90 table_id
91 } else {
92 self.table_id_sequence.next().await? as TableId
93 };
94 Ok(table_id)
95 }
96
97 fn create_wal_options(
98 &self,
99 table_route: &PhysicalTableRouteValue,
100 skip_wal: bool,
101 ) -> Result<HashMap<RegionNumber, String>> {
102 let region_numbers = table_route
103 .region_routes
104 .iter()
105 .map(|route| route.region.id.region_number())
106 .collect();
107 allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
108 }
109
110 async fn create_table_route(
111 &self,
112 table_id: TableId,
113 task: &CreateTableTask,
114 ) -> Result<PhysicalTableRouteValue> {
115 let regions = task.partitions.len().max(1);
116 let peers = self.peer_allocator.alloc(regions).await?;
117 debug!("Allocated peers {:?} for table {}", peers, table_id);
118
119 let mut region_routes = task
120 .partitions
121 .iter()
122 .enumerate()
123 .map(|(i, partition)| {
124 let region = Region {
125 id: RegionId::new(table_id, i as u32),
126 partition_expr: partition.expression.clone(),
127 ..Default::default()
128 };
129
130 let peer = peers[i % peers.len()].clone();
131
132 RegionRoute {
133 region,
134 leader_peer: Some(peer),
135 ..Default::default()
136 }
137 })
138 .collect::<Vec<_>>();
139
140 if region_routes.is_empty() {
142 region_routes.push(RegionRoute {
143 region: Region {
144 id: RegionId::new(table_id, 0),
145 ..Default::default()
146 },
147 leader_peer: Some(peers[0].clone()),
148 ..Default::default()
149 });
150 }
151
152 Ok(PhysicalTableRouteValue::new(region_routes))
153 }
154
155 pub async fn create_view(&self, table_id: &Option<api::v1::TableId>) -> Result<TableMetadata> {
157 let table_id = self.allocate_table_id(table_id).await?;
158
159 Ok(TableMetadata {
160 table_id,
161 ..Default::default()
162 })
163 }
164
165 pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
166 let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
167 let table_route = self.create_table_route(table_id, task).await?;
168
169 let region_wal_options =
170 self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
171
172 debug!(
173 "Allocated region wal options {:?} for table {}",
174 region_wal_options, table_id
175 );
176
177 Ok(TableMetadata {
178 table_id,
179 table_route,
180 region_wal_options,
181 })
182 }
183
184 pub fn table_id_sequence(&self) -> SequenceRef {
185 self.table_id_sequence.clone()
186 }
187}