common_meta/ddl/
create_logical_tables.rs1mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28pub use region_request::create_region_request_builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
33use store_api::storage::{RegionId, RegionNumber};
34use strum::AsRefStr;
35use table::metadata::{RawTableInfo, TableId};
36
37use crate::ddl::utils::{
38 add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
39};
40use crate::ddl::DdlContext;
41use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
42use crate::key::table_route::TableRouteValue;
43use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
44use crate::metrics;
45use crate::rpc::ddl::CreateTableTask;
46use crate::rpc::router::{find_leaders, RegionRoute};
47
48pub struct CreateLogicalTablesProcedure {
49 pub context: DdlContext,
50 pub data: CreateTablesData,
51}
52
53impl CreateLogicalTablesProcedure {
54 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
55
56 pub fn new(
57 tasks: Vec<CreateTableTask>,
58 physical_table_id: TableId,
59 context: DdlContext,
60 ) -> Self {
61 Self {
62 context,
63 data: CreateTablesData {
64 state: CreateTablesState::Prepare,
65 tasks,
66 table_ids_already_exists: vec![],
67 physical_table_id,
68 physical_region_numbers: vec![],
69 physical_columns: vec![],
70 },
71 }
72 }
73
74 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
75 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
76 Ok(Self { context, data })
77 }
78
79 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
90 self.check_input_tasks()?;
91 self.fill_physical_table_info().await?;
93 self.check_tables_already_exist().await?;
95
96 if self
98 .data
99 .table_ids_already_exists
100 .iter()
101 .all(Option::is_some)
102 {
103 return Ok(Status::done_with_output(
104 self.data
105 .table_ids_already_exists
106 .drain(..)
107 .flatten()
108 .collect::<Vec<_>>(),
109 ));
110 }
111
112 self.allocate_table_ids().await?;
114
115 self.data.state = CreateTablesState::DatanodeCreateRegions;
116 Ok(Status::executing(true))
117 }
118
119 pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
120 let (_, physical_table_route) = self
121 .context
122 .table_metadata_manager
123 .table_route_manager()
124 .get_physical_table_route(self.data.physical_table_id)
125 .await?;
126
127 self.create_regions(&physical_table_route.region_routes)
128 .await
129 }
130
131 pub async fn on_create_metadata(&mut self) -> Result<Status> {
137 self.update_physical_table_metadata().await?;
138 let table_ids = self.create_logical_tables_metadata().await?;
139
140 Ok(Status::done_with_output(table_ids))
141 }
142
143 async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
144 let leaders = find_leaders(region_routes);
145 let mut create_region_tasks = Vec::with_capacity(leaders.len());
146
147 for peer in leaders {
148 let requester = self.context.node_manager.datanode(&peer).await;
149 let Some(request) = self.make_request(&peer, region_routes)? else {
150 debug!("no region request to send to datanode {}", peer);
151 break;
154 };
155
156 create_region_tasks.push(async move {
157 requester
158 .handle(request)
159 .await
160 .map_err(add_peer_context_if_needed(peer))
161 });
162 }
163
164 let mut results = future::join_all(create_region_tasks)
165 .await
166 .into_iter()
167 .collect::<Result<Vec<_>>>()?;
168
169 let phy_raw_schemas = results
171 .iter_mut()
172 .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
173 .collect::<Vec<_>>();
174
175 if phy_raw_schemas.is_empty() {
176 self.submit_sync_region_requests(results, region_routes)
177 .await;
178 self.data.state = CreateTablesState::CreateMetadata;
179 return Ok(Status::executing(false));
180 }
181
182 let first = phy_raw_schemas.first().unwrap();
185 ensure!(
186 phy_raw_schemas.iter().all(|x| x == first),
187 MetadataCorruptionSnafu {
188 err_msg: "The physical schemas from datanodes are not the same."
189 }
190 );
191
192 if let Some(phy_raw_schemas) = first {
194 self.data.physical_columns =
195 ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?;
196 } else {
197 warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
198 }
199
200 self.submit_sync_region_requests(results, region_routes)
201 .await;
202 self.data.state = CreateTablesState::CreateMetadata;
203
204 Ok(Status::executing(true))
205 }
206
207 async fn submit_sync_region_requests(
208 &self,
209 results: Vec<RegionResponse>,
210 region_routes: &[RegionRoute],
211 ) {
212 if let Err(err) = sync_follower_regions(
213 &self.context,
214 self.data.physical_table_id,
215 results,
216 region_routes,
217 METRIC_ENGINE,
218 )
219 .await
220 {
221 error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
222 }
223 }
224}
225
226#[async_trait]
227impl Procedure for CreateLogicalTablesProcedure {
228 fn type_name(&self) -> &str {
229 Self::TYPE_NAME
230 }
231
232 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
233 let state = &self.data.state;
234
235 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
236 .with_label_values(&[state.as_ref()])
237 .start_timer();
238
239 match state {
240 CreateTablesState::Prepare => self.on_prepare().await,
241 CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
242 CreateTablesState::CreateMetadata => self.on_create_metadata().await,
243 }
244 .map_err(map_to_procedure_error)
245 }
246
247 fn dump(&self) -> ProcedureResult<String> {
248 serde_json::to_string(&self.data).context(ToJsonSnafu)
249 }
250
251 fn lock_key(&self) -> LockKey {
252 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
256 let table_ref = self.data.tasks[0].table_ref();
257 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
258 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
259 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
260
261 for task in &self.data.tasks {
262 lock_key.push(
263 TableNameLock::new(
264 &task.create_table.catalog_name,
265 &task.create_table.schema_name,
266 &task.create_table.table_name,
267 )
268 .into(),
269 );
270 }
271 LockKey::new(lock_key)
272 }
273}
274
275#[derive(Debug, Serialize, Deserialize)]
276pub struct CreateTablesData {
277 state: CreateTablesState,
278 tasks: Vec<CreateTableTask>,
279 table_ids_already_exists: Vec<Option<TableId>>,
280 physical_table_id: TableId,
281 physical_region_numbers: Vec<RegionNumber>,
282 physical_columns: Vec<ColumnMetadata>,
283}
284
285impl CreateTablesData {
286 pub fn state(&self) -> &CreateTablesState {
287 &self.state
288 }
289
290 fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
291 self.tasks
292 .iter()
293 .map(|task| &task.create_table)
294 .collect::<Vec<_>>()
295 }
296
297 fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
300 self.tasks
301 .iter()
302 .zip(self.table_ids_already_exists.iter())
303 .flat_map(|(task, table_id)| {
304 if table_id.is_none() {
305 let table_info = task.table_info.clone();
306 let region_ids = self
307 .physical_region_numbers
308 .iter()
309 .map(|region_number| {
310 RegionId::new(table_info.ident.table_id, *region_number)
311 })
312 .collect();
313 let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
314 Some((table_info, table_route))
315 } else {
316 None
317 }
318 })
319 .collect::<Vec<_>>()
320 }
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
324pub enum CreateTablesState {
325 Prepare,
327 DatanodeCreateRegions,
329 CreateMetadata,
331}