common_meta/ddl/
create_logical_tables.rs1mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28pub use region_request::create_region_request_builder;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
33use store_api::storage::{RegionId, RegionNumber};
34use strum::AsRefStr;
35use table::metadata::{RawTableInfo, TableId};
36
37use crate::ddl::utils::{
38 add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error,
39 sync_follower_regions,
40};
41use crate::ddl::DdlContext;
42use crate::error::Result;
43use crate::key::table_route::TableRouteValue;
44use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
45use crate::metrics;
46use crate::rpc::ddl::CreateTableTask;
47use crate::rpc::router::{find_leaders, RegionRoute};
48
49pub struct CreateLogicalTablesProcedure {
50 pub context: DdlContext,
51 pub data: CreateTablesData,
52}
53
54impl CreateLogicalTablesProcedure {
55 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
56
57 pub fn new(
58 tasks: Vec<CreateTableTask>,
59 physical_table_id: TableId,
60 context: DdlContext,
61 ) -> Self {
62 Self {
63 context,
64 data: CreateTablesData {
65 state: CreateTablesState::Prepare,
66 tasks,
67 table_ids_already_exists: vec![],
68 physical_table_id,
69 physical_region_numbers: vec![],
70 physical_columns: vec![],
71 },
72 }
73 }
74
75 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
76 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
77 Ok(Self { context, data })
78 }
79
80 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
91 self.check_input_tasks()?;
92 self.fill_physical_table_info().await?;
94 self.check_tables_already_exist().await?;
96
97 if self
99 .data
100 .table_ids_already_exists
101 .iter()
102 .all(Option::is_some)
103 {
104 return Ok(Status::done_with_output(
105 self.data
106 .table_ids_already_exists
107 .drain(..)
108 .flatten()
109 .collect::<Vec<_>>(),
110 ));
111 }
112
113 self.allocate_table_ids().await?;
115
116 self.data.state = CreateTablesState::DatanodeCreateRegions;
117 Ok(Status::executing(true))
118 }
119
120 pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
121 let (_, physical_table_route) = self
122 .context
123 .table_metadata_manager
124 .table_route_manager()
125 .get_physical_table_route(self.data.physical_table_id)
126 .await?;
127
128 self.create_regions(&physical_table_route.region_routes)
129 .await
130 }
131
132 pub async fn on_create_metadata(&mut self) -> Result<Status> {
138 self.update_physical_table_metadata().await?;
139 let table_ids = self.create_logical_tables_metadata().await?;
140
141 Ok(Status::done_with_output(table_ids))
142 }
143
144 async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
145 let leaders = find_leaders(region_routes);
146 let mut create_region_tasks = Vec::with_capacity(leaders.len());
147
148 for peer in leaders {
149 let requester = self.context.node_manager.datanode(&peer).await;
150 let Some(request) = self.make_request(&peer, region_routes)? else {
151 debug!("no region request to send to datanode {}", peer);
152 break;
155 };
156
157 create_region_tasks.push(async move {
158 requester
159 .handle(request)
160 .await
161 .map_err(add_peer_context_if_needed(peer))
162 });
163 }
164
165 let mut results = future::join_all(create_region_tasks)
166 .await
167 .into_iter()
168 .collect::<Result<Vec<_>>>()?;
169
170 if let Some(column_metadatas) =
171 extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
172 {
173 self.data.physical_columns = column_metadatas;
174 } else {
175 warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
176 }
177
178 self.submit_sync_region_requests(&results, region_routes)
179 .await;
180 self.data.state = CreateTablesState::CreateMetadata;
181 Ok(Status::executing(true))
182 }
183
184 async fn submit_sync_region_requests(
185 &self,
186 results: &[RegionResponse],
187 region_routes: &[RegionRoute],
188 ) {
189 if let Err(err) = sync_follower_regions(
190 &self.context,
191 self.data.physical_table_id,
192 results,
193 region_routes,
194 METRIC_ENGINE,
195 )
196 .await
197 {
198 error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
199 }
200 }
201}
202
203#[async_trait]
204impl Procedure for CreateLogicalTablesProcedure {
205 fn type_name(&self) -> &str {
206 Self::TYPE_NAME
207 }
208
209 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
210 let state = &self.data.state;
211
212 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
213 .with_label_values(&[state.as_ref()])
214 .start_timer();
215
216 match state {
217 CreateTablesState::Prepare => self.on_prepare().await,
218 CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
219 CreateTablesState::CreateMetadata => self.on_create_metadata().await,
220 }
221 .map_err(map_to_procedure_error)
222 }
223
224 fn dump(&self) -> ProcedureResult<String> {
225 serde_json::to_string(&self.data).context(ToJsonSnafu)
226 }
227
228 fn lock_key(&self) -> LockKey {
229 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
233 let table_ref = self.data.tasks[0].table_ref();
234 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
235 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
236 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
237
238 for task in &self.data.tasks {
239 lock_key.push(
240 TableNameLock::new(
241 &task.create_table.catalog_name,
242 &task.create_table.schema_name,
243 &task.create_table.table_name,
244 )
245 .into(),
246 );
247 }
248 LockKey::new(lock_key)
249 }
250}
251
252#[derive(Debug, Serialize, Deserialize)]
253pub struct CreateTablesData {
254 state: CreateTablesState,
255 tasks: Vec<CreateTableTask>,
256 table_ids_already_exists: Vec<Option<TableId>>,
257 physical_table_id: TableId,
258 physical_region_numbers: Vec<RegionNumber>,
259 physical_columns: Vec<ColumnMetadata>,
260}
261
262impl CreateTablesData {
263 pub fn state(&self) -> &CreateTablesState {
264 &self.state
265 }
266
267 fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
268 self.tasks
269 .iter()
270 .map(|task| &task.create_table)
271 .collect::<Vec<_>>()
272 }
273
274 fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
277 self.tasks
278 .iter()
279 .zip(self.table_ids_already_exists.iter())
280 .flat_map(|(task, table_id)| {
281 if table_id.is_none() {
282 let table_info = task.table_info.clone();
283 let region_ids = self
284 .physical_region_numbers
285 .iter()
286 .map(|region_number| {
287 RegionId::new(table_info.ident.table_id, *region_number)
288 })
289 .collect();
290 let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
291 Some((table_info, table_route))
292 } else {
293 None
294 }
295 })
296 .collect::<Vec<_>>()
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
301pub enum CreateTablesState {
302 Prepare,
304 DatanodeCreateRegions,
306 CreateMetadata,
308}