common_meta/ddl/
table_meta.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_telemetry::{debug, info};
20use snafu::ensure;
21use store_api::storage::{RegionId, RegionNumber, TableId};
22
23use crate::ddl::TableMetadata;
24use crate::error::{self, Result, UnsupportedSnafu};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::peer::Peer;
27use crate::rpc::ddl::CreateTableTask;
28use crate::rpc::router::{Region, RegionRoute};
29use crate::sequence::SequenceRef;
30use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
31
32pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
33
34#[derive(Clone)]
35pub struct TableMetadataAllocator {
36 table_id_sequence: SequenceRef,
37 wal_options_allocator: WalOptionsAllocatorRef,
38 peer_allocator: PeerAllocatorRef,
39}
40
41impl TableMetadataAllocator {
42 pub fn new(
43 table_id_sequence: SequenceRef,
44 wal_options_allocator: WalOptionsAllocatorRef,
45 ) -> Self {
46 Self::with_peer_allocator(
47 table_id_sequence,
48 wal_options_allocator,
49 Arc::new(NoopPeerAllocator),
50 )
51 }
52
53 pub fn with_peer_allocator(
54 table_id_sequence: SequenceRef,
55 wal_options_allocator: WalOptionsAllocatorRef,
56 peer_allocator: PeerAllocatorRef,
57 ) -> Self {
58 Self {
59 table_id_sequence,
60 wal_options_allocator,
61 peer_allocator,
62 }
63 }
64
65 pub(crate) async fn allocate_table_id(
66 &self,
67 table_id: &Option<api::v1::TableId>,
68 ) -> Result<TableId> {
69 let table_id = if let Some(table_id) = table_id {
70 let table_id = table_id.id;
71
72 ensure!(
73 !self
74 .table_id_sequence
75 .min_max()
76 .await
77 .contains(&(table_id as u64)),
78 UnsupportedSnafu {
79 operation: format!(
80 "create table by id {} that is reserved in this node",
81 table_id
82 )
83 }
84 );
85
86 info!(
87 "Received explicitly allocated table id {}, will use it directly.",
88 table_id
89 );
90
91 table_id
92 } else {
93 self.table_id_sequence.next().await? as TableId
94 };
95 Ok(table_id)
96 }
97
98 fn create_wal_options(
99 &self,
100 table_route: &PhysicalTableRouteValue,
101 skip_wal: bool,
102 ) -> Result<HashMap<RegionNumber, String>> {
103 let region_numbers = table_route
104 .region_routes
105 .iter()
106 .map(|route| route.region.id.region_number())
107 .collect();
108 allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
109 }
110
111 async fn create_table_route(
112 &self,
113 table_id: TableId,
114 task: &CreateTableTask,
115 ) -> Result<PhysicalTableRouteValue> {
116 let regions = task.partitions.len();
117 ensure!(
118 regions > 0,
119 error::UnexpectedSnafu {
120 err_msg: "The number of partitions must be greater than 0"
121 }
122 );
123
124 let peers = self.peer_allocator.alloc(regions).await?;
125 debug!("Allocated peers {:?} for table {}", peers, table_id);
126 let region_routes = task
127 .partitions
128 .iter()
129 .enumerate()
130 .map(|(i, partition)| {
131 let region = Region {
132 id: RegionId::new(table_id, i as u32),
133 partition: Some(partition.clone().into()),
134 ..Default::default()
135 };
136
137 let peer = peers[i % peers.len()].clone();
138
139 RegionRoute {
140 region,
141 leader_peer: Some(peer),
142 ..Default::default()
143 }
144 })
145 .collect::<Vec<_>>();
146
147 Ok(PhysicalTableRouteValue::new(region_routes))
148 }
149
150 pub async fn create_view(&self, table_id: &Option<api::v1::TableId>) -> Result<TableMetadata> {
152 let table_id = self.allocate_table_id(table_id).await?;
153
154 Ok(TableMetadata {
155 table_id,
156 ..Default::default()
157 })
158 }
159
160 pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
161 let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
162 let table_route = self.create_table_route(table_id, task).await?;
163
164 let region_wal_options =
165 self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
166
167 debug!(
168 "Allocated region wal options {:?} for table {}",
169 region_wal_options, table_id
170 );
171
172 Ok(TableMetadata {
173 table_id,
174 table_route,
175 region_wal_options,
176 })
177 }
178}
179
180pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
181
182#[async_trait]
184pub trait PeerAllocator: Send + Sync {
185 async fn alloc(&self, regions: usize) -> Result<Vec<Peer>>;
187}
188
189struct NoopPeerAllocator;
190
191#[async_trait]
192impl PeerAllocator for NoopPeerAllocator {
193 async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
194 Ok(vec![Peer::default(); regions])
195 }
196}