common_meta/ddl/
table_meta.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_telemetry::{debug, info};
20use snafu::ensure;
21use store_api::storage::{RegionId, RegionNumber, TableId};
22
23use crate::ddl::TableMetadata;
24use crate::error::{self, Result, UnsupportedSnafu};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::peer::Peer;
27use crate::rpc::ddl::CreateTableTask;
28use crate::rpc::router::{Region, RegionRoute};
29use crate::sequence::SequenceRef;
30use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
31
32pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
33
34#[derive(Clone)]
35pub struct TableMetadataAllocator {
36 table_id_sequence: SequenceRef,
37 wal_options_allocator: WalOptionsAllocatorRef,
38 peer_allocator: PeerAllocatorRef,
39}
40
41impl TableMetadataAllocator {
42 pub fn new(
43 table_id_sequence: SequenceRef,
44 wal_options_allocator: WalOptionsAllocatorRef,
45 ) -> Self {
46 Self::with_peer_allocator(
47 table_id_sequence,
48 wal_options_allocator,
49 Arc::new(NoopPeerAllocator),
50 )
51 }
52
53 pub fn with_peer_allocator(
54 table_id_sequence: SequenceRef,
55 wal_options_allocator: WalOptionsAllocatorRef,
56 peer_allocator: PeerAllocatorRef,
57 ) -> Self {
58 Self {
59 table_id_sequence,
60 wal_options_allocator,
61 peer_allocator,
62 }
63 }
64
65 pub(crate) async fn allocate_table_id(
66 &self,
67 table_id: &Option<api::v1::TableId>,
68 ) -> Result<TableId> {
69 let table_id = if let Some(table_id) = table_id {
70 let table_id = table_id.id;
71
72 ensure!(
73 !self
74 .table_id_sequence
75 .min_max()
76 .await
77 .contains(&(table_id as u64)),
78 UnsupportedSnafu {
79 operation: format!(
80 "create table by id {} that is reserved in this node",
81 table_id
82 )
83 }
84 );
85
86 info!(
87 "Received explicitly allocated table id {}, will use it directly.",
88 table_id
89 );
90
91 table_id
92 } else {
93 self.table_id_sequence.next().await? as TableId
94 };
95 Ok(table_id)
96 }
97
98 fn create_wal_options(
99 &self,
100 table_route: &PhysicalTableRouteValue,
101 skip_wal: bool,
102 ) -> Result<HashMap<RegionNumber, String>> {
103 let region_numbers = table_route
104 .region_routes
105 .iter()
106 .map(|route| route.region.id.region_number())
107 .collect();
108 allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
109 }
110
111 async fn create_table_route(
112 &self,
113 table_id: TableId,
114 task: &CreateTableTask,
115 ) -> Result<PhysicalTableRouteValue> {
116 let regions = task.partitions.len();
117 ensure!(
118 regions > 0,
119 error::UnexpectedSnafu {
120 err_msg: "The number of partitions must be greater than 0"
121 }
122 );
123
124 let peers = self.peer_allocator.alloc(regions).await?;
125 let region_routes = task
126 .partitions
127 .iter()
128 .enumerate()
129 .map(|(i, partition)| {
130 let region = Region {
131 id: RegionId::new(table_id, i as u32),
132 partition: Some(partition.clone().into()),
133 ..Default::default()
134 };
135
136 let peer = peers[i % peers.len()].clone();
137
138 RegionRoute {
139 region,
140 leader_peer: Some(peer),
141 ..Default::default()
142 }
143 })
144 .collect::<Vec<_>>();
145
146 Ok(PhysicalTableRouteValue::new(region_routes))
147 }
148
149 pub async fn create_view(&self, table_id: &Option<api::v1::TableId>) -> Result<TableMetadata> {
151 let table_id = self.allocate_table_id(table_id).await?;
152
153 Ok(TableMetadata {
154 table_id,
155 ..Default::default()
156 })
157 }
158
159 pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
160 let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
161 let table_route = self.create_table_route(table_id, task).await?;
162
163 let region_wal_options =
164 self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
165
166 debug!(
167 "Allocated region wal options {:?} for table {}",
168 region_wal_options, table_id
169 );
170
171 Ok(TableMetadata {
172 table_id,
173 table_route,
174 region_wal_options,
175 })
176 }
177}
178
179pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
180
181#[async_trait]
183pub trait PeerAllocator: Send + Sync {
184 async fn alloc(&self, regions: usize) -> Result<Vec<Peer>>;
186}
187
188struct NoopPeerAllocator;
189
190#[async_trait]
191impl PeerAllocator for NoopPeerAllocator {
192 async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
193 Ok(vec![Peer::default(); regions])
194 }
195}