common_meta/ddl/
table_meta.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_telemetry::{debug, info};
20use snafu::ensure;
21use store_api::storage::{RegionId, RegionNumber, TableId};
22
23use crate::ddl::TableMetadata;
24use crate::error::{Result, UnsupportedSnafu};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::peer::Peer;
27use crate::rpc::ddl::CreateTableTask;
28use crate::rpc::router::{Region, RegionRoute};
29use crate::sequence::SequenceRef;
30use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
31
32pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
33
34#[derive(Clone)]
35pub struct TableMetadataAllocator {
36 table_id_sequence: SequenceRef,
37 wal_options_allocator: WalOptionsAllocatorRef,
38 peer_allocator: PeerAllocatorRef,
39}
40
41impl TableMetadataAllocator {
42 pub fn new(
43 table_id_sequence: SequenceRef,
44 wal_options_allocator: WalOptionsAllocatorRef,
45 ) -> Self {
46 Self::with_peer_allocator(
47 table_id_sequence,
48 wal_options_allocator,
49 Arc::new(NoopPeerAllocator),
50 )
51 }
52
53 pub fn with_peer_allocator(
54 table_id_sequence: SequenceRef,
55 wal_options_allocator: WalOptionsAllocatorRef,
56 peer_allocator: PeerAllocatorRef,
57 ) -> Self {
58 Self {
59 table_id_sequence,
60 wal_options_allocator,
61 peer_allocator,
62 }
63 }
64
65 pub(crate) async fn allocate_table_id(
66 &self,
67 table_id: &Option<api::v1::TableId>,
68 ) -> Result<TableId> {
69 let table_id = if let Some(table_id) = table_id {
70 let table_id = table_id.id;
71
72 ensure!(
73 !self
74 .table_id_sequence
75 .min_max()
76 .await
77 .contains(&(table_id as u64)),
78 UnsupportedSnafu {
79 operation: format!(
80 "create table by id {} that is reserved in this node",
81 table_id
82 )
83 }
84 );
85
86 info!(
87 "Received explicitly allocated table id {}, will use it directly.",
88 table_id
89 );
90
91 table_id
92 } else {
93 self.table_id_sequence.next().await? as TableId
94 };
95 Ok(table_id)
96 }
97
98 fn create_wal_options(
99 &self,
100 table_route: &PhysicalTableRouteValue,
101 skip_wal: bool,
102 ) -> Result<HashMap<RegionNumber, String>> {
103 let region_numbers = table_route
104 .region_routes
105 .iter()
106 .map(|route| route.region.id.region_number())
107 .collect();
108 allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
109 }
110
111 async fn create_table_route(
112 &self,
113 table_id: TableId,
114 task: &CreateTableTask,
115 ) -> Result<PhysicalTableRouteValue> {
116 let regions = task.partitions.len().max(1);
117 let peers = self.peer_allocator.alloc(regions).await?;
118 debug!("Allocated peers {:?} for table {}", peers, table_id);
119
120 let mut region_routes = task
121 .partitions
122 .iter()
123 .enumerate()
124 .map(|(i, partition)| {
125 let region = Region {
126 id: RegionId::new(table_id, i as u32),
127 partition_expr: partition.expression.clone(),
128 ..Default::default()
129 };
130
131 let peer = peers[i % peers.len()].clone();
132
133 RegionRoute {
134 region,
135 leader_peer: Some(peer),
136 ..Default::default()
137 }
138 })
139 .collect::<Vec<_>>();
140
141 if region_routes.is_empty() {
143 region_routes.push(RegionRoute {
144 region: Region {
145 id: RegionId::new(table_id, 0),
146 ..Default::default()
147 },
148 leader_peer: Some(peers[0].clone()),
149 ..Default::default()
150 });
151 }
152
153 Ok(PhysicalTableRouteValue::new(region_routes))
154 }
155
156 pub async fn create_view(&self, table_id: &Option<api::v1::TableId>) -> Result<TableMetadata> {
158 let table_id = self.allocate_table_id(table_id).await?;
159
160 Ok(TableMetadata {
161 table_id,
162 ..Default::default()
163 })
164 }
165
166 pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
167 let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
168 let table_route = self.create_table_route(table_id, task).await?;
169
170 let region_wal_options =
171 self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
172
173 debug!(
174 "Allocated region wal options {:?} for table {}",
175 region_wal_options, table_id
176 );
177
178 Ok(TableMetadata {
179 table_id,
180 table_route,
181 region_wal_options,
182 })
183 }
184
185 pub fn table_id_sequence(&self) -> SequenceRef {
186 self.table_id_sequence.clone()
187 }
188}
189
190pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
191
192#[async_trait]
194pub trait PeerAllocator: Send + Sync {
195 async fn alloc(&self, regions: usize) -> Result<Vec<Peer>>;
197}
198
199struct NoopPeerAllocator;
200
201#[async_trait]
202impl PeerAllocator for NoopPeerAllocator {
203 async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
204 Ok(vec![Peer::default(); regions])
205 }
206}