common_meta/ddl/
table_meta.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::{debug, info};
19use snafu::ensure;
20use store_api::storage::{RegionNumber, TableId};
21
22use crate::ddl::TableMetadata;
23use crate::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
24use crate::ddl::allocator::resource_id::ResourceIdAllocatorRef;
25use crate::ddl::allocator::wal_options::WalOptionsAllocatorRef;
26use crate::error::{Result, UnsupportedSnafu};
27use crate::key::table_route::PhysicalTableRouteValue;
28use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
29use crate::rpc::ddl::CreateTableTask;
30
31pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
32
33#[derive(Clone)]
34pub struct TableMetadataAllocator {
35 table_id_allocator: ResourceIdAllocatorRef,
36 wal_options_allocator: WalOptionsAllocatorRef,
37 region_routes_allocator: RegionRoutesAllocatorRef,
38}
39
40impl TableMetadataAllocator {
41 pub fn new(
42 table_id_allocator: ResourceIdAllocatorRef,
43 wal_options_allocator: WalOptionsAllocatorRef,
44 ) -> Self {
45 Self::with_peer_allocator(
46 table_id_allocator,
47 wal_options_allocator,
48 Arc::new(NoopPeerAllocator),
49 )
50 }
51
52 pub fn with_peer_allocator(
53 table_id_allocator: ResourceIdAllocatorRef,
54 wal_options_allocator: WalOptionsAllocatorRef,
55 peer_allocator: PeerAllocatorRef,
56 ) -> Self {
57 Self {
58 table_id_allocator,
59 wal_options_allocator,
60 region_routes_allocator: Arc::new(peer_allocator) as _,
61 }
62 }
63
64 pub(crate) async fn allocate_table_id(
65 &self,
66 table_id: &Option<api::v1::TableId>,
67 ) -> Result<TableId> {
68 let table_id = if let Some(table_id) = table_id {
69 let table_id = table_id.id;
70
71 ensure!(
72 !self
73 .table_id_allocator
74 .min_max()
75 .await
76 .contains(&(table_id as u64)),
77 UnsupportedSnafu {
78 operation: format!(
79 "create table by id {} that is reserved in this node",
80 table_id
81 )
82 }
83 );
84
85 info!(
86 "Received explicitly allocated table id {}, will use it directly.",
87 table_id
88 );
89
90 table_id
91 } else {
92 self.table_id_allocator.next().await? as TableId
93 };
94 Ok(table_id)
95 }
96
97 async fn create_wal_options(
98 &self,
99 region_numbers: &[RegionNumber],
100 skip_wal: bool,
101 ) -> Result<HashMap<RegionNumber, String>> {
102 self.wal_options_allocator
103 .allocate(region_numbers, skip_wal)
104 .await
105 }
106
107 async fn create_table_route(
108 &self,
109 table_id: TableId,
110 partition_exprs: &[&str],
111 ) -> Result<PhysicalTableRouteValue> {
112 let region_number_and_partition_exprs = partition_exprs
113 .iter()
114 .enumerate()
115 .map(|(i, partition)| (i as u32, *partition))
116 .collect::<Vec<_>>();
117 let region_routes = self
118 .region_routes_allocator
119 .allocate(table_id, ®ion_number_and_partition_exprs)
120 .await?;
121
122 Ok(PhysicalTableRouteValue::new(region_routes))
123 }
124
125 pub async fn create_view(&self, table_id: &Option<api::v1::TableId>) -> Result<TableMetadata> {
127 let table_id = self.allocate_table_id(table_id).await?;
128
129 Ok(TableMetadata {
130 table_id,
131 ..Default::default()
132 })
133 }
134
135 pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
136 let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
137 let partition_exprs = task
138 .partitions
139 .iter()
140 .map(|p| p.expression.as_str())
141 .collect::<Vec<_>>();
142 let table_route = self.create_table_route(table_id, &partition_exprs).await?;
143 let region_numbers = table_route
144 .region_routes
145 .iter()
146 .map(|route| route.region.id.region_number())
147 .collect::<Vec<_>>();
148 let region_wal_options = self
149 .create_wal_options(®ion_numbers, task.table_info.meta.options.skip_wal)
150 .await?;
151
152 debug!(
153 "Allocated region wal options {:?} for table {}",
154 region_wal_options, table_id
155 );
156
157 Ok(TableMetadata {
158 table_id,
159 table_route,
160 region_wal_options,
161 })
162 }
163
164 pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef {
165 self.table_id_allocator.clone()
166 }
167}