1use std::cell::LazyCell;
16use std::collections::HashMap;
17
18use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
19use api::v1::alter_table_expr::Kind;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::region::InsertRequests as RegionInsertRequests;
22use api::v1::{
23 AlterTableExpr, ColumnSchema as GrpcColumnSchema, ModifyColumnType, ModifyColumnTypes, Row,
24 Rows,
25};
26use catalog::CatalogManager;
27use common_telemetry::info;
28use common_time::Timezone;
29use datatypes::data_type::ConcreteDataType;
30use datatypes::schema::{ColumnSchema, SchemaRef};
31use datatypes::types::JsonType;
32use datatypes::value::Value;
33use partition::manager::PartitionRuleManager;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::{OptionExt, ResultExt, ensure};
36use sql::ast::ObjectNamePartExt;
37use sql::statements::insert::Insert;
38use sqlparser::ast::{ObjectName, Value as SqlValue};
39use table::TableRef;
40use table::metadata::TableInfoRef;
41
42use crate::error::{
43 CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
44 ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidSqlSnafu, MissingInsertBodySnafu,
45 ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu,
46};
47use crate::insert::InstantAndNormalInsertRequests;
48use crate::req_convert::common::partitioner::Partitioner;
49use crate::req_convert::insert::semantic_type;
50use crate::statement::StatementExecutor;
51
52const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
53
54pub struct StatementToRegion<'a> {
55 catalog_manager: &'a dyn CatalogManager,
56 partition_manager: &'a PartitionRuleManager,
57 ctx: &'a QueryContext,
58}
59
60impl<'a> StatementToRegion<'a> {
61 pub fn new(
62 catalog_manager: &'a dyn CatalogManager,
63 partition_manager: &'a PartitionRuleManager,
64 ctx: &'a QueryContext,
65 ) -> Self {
66 Self {
67 catalog_manager,
68 partition_manager,
69 ctx,
70 }
71 }
72
73 pub async fn convert(
74 &self,
75 stmt: &Insert,
76 query_ctx: &QueryContextRef,
77 statement_executor: &StatementExecutor,
78 ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
79 let name = stmt.table_name().context(ParseSqlSnafu)?;
80 let (catalog, schema, table_name) = self.get_full_name(name)?;
81 let mut table = self.get_table(&catalog, &schema, &table_name).await?;
82 let table_schema = table.schema();
83
84 ensure!(
85 !common_catalog::consts::is_readonly_schema(&schema),
86 SchemaReadOnlySnafu { name: schema }
87 );
88
89 let column_names = column_names(stmt, &table_schema);
90 let column_count = column_names.len();
91
92 let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
93 let row_count = sql_rows.len();
94
95 sql_rows.iter().try_for_each(|r| {
96 ensure!(
97 r.len() == column_count,
98 InvalidSqlSnafu {
99 err_msg: format!(
100 "column count mismatch, columns: {}, values: {}",
101 column_count,
102 r.len()
103 )
104 }
105 );
106 Ok(())
107 })?;
108
109 let mut rows = vec![
110 Row {
111 values: Vec::with_capacity(column_count)
112 };
113 row_count
114 ];
115
116 fn find_insert_columns<'a>(
117 table: &'a TableRef,
118 column_names: &[&String],
119 ) -> Result<Vec<&'a ColumnSchema>> {
120 let schema = table.schema_ref();
121 column_names
122 .iter()
123 .map(|name| {
124 schema
125 .column_schema_by_name(name)
126 .context(ColumnNotFoundSnafu { msg: *name })
127 })
128 .collect::<Result<Vec<_>>>()
129 }
130
131 let mut insert_columns = find_insert_columns(&table, &column_names)?;
132 let converter = SqlRowConverter::new(&insert_columns, query_ctx);
133
134 let mut updater = JsonColumnTypeUpdater::new(statement_executor, query_ctx);
137 let value_rows = converter.convert(&mut updater, &sql_rows)?;
138
139 if updater
142 .maybe_update_column_type(&catalog, &schema, &table_name, &insert_columns)
143 .await?
144 {
145 table = self.get_table(&catalog, &schema, &table_name).await?;
147 insert_columns = find_insert_columns(&table, &column_names)?;
148 }
149
150 for (i, row) in value_rows.into_iter().enumerate() {
152 for value in row {
153 let grpc_value = to_grpc_value(value);
154 rows[i].values.push(grpc_value);
155 }
156 }
157
158 let table_info = table.table_info();
159 let mut schema = Vec::with_capacity(column_count);
160 for column_schema in insert_columns {
161 let (datatype, datatype_extension) =
162 ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
163 .context(ColumnDataTypeSnafu)?
164 .to_parts();
165
166 let column_name = &column_schema.name;
167 let semantic_type = semantic_type(&table_info, column_name)?;
168
169 let grpc_column_schema = GrpcColumnSchema {
170 column_name: column_name.clone(),
171 datatype: datatype.into(),
172 semantic_type: semantic_type.into(),
173 datatype_extension,
174 options: options_from_column_schema(column_schema),
175 };
176 schema.push(grpc_column_schema);
177 }
178
179 let requests = Partitioner::new(self.partition_manager)
180 .partition_insert_requests(&table_info, Rows { schema, rows })
181 .await?;
182 let requests = RegionInsertRequests { requests };
183 if table_info.is_ttl_instant_table() {
184 Ok((
185 InstantAndNormalInsertRequests {
186 normal_requests: Default::default(),
187 instant_requests: requests,
188 },
189 table_info,
190 ))
191 } else {
192 Ok((
193 InstantAndNormalInsertRequests {
194 normal_requests: requests,
195 instant_requests: Default::default(),
196 },
197 table_info,
198 ))
199 }
200 }
201
202 async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
203 self.catalog_manager
204 .table(catalog, schema, table, None)
205 .await
206 .context(CatalogSnafu)?
207 .with_context(|| TableNotFoundSnafu {
208 table_name: common_catalog::format_full_table_name(catalog, schema, table),
209 })
210 }
211
212 fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
213 match &obj_name.0[..] {
214 [table] => Ok((
215 self.ctx.current_catalog().to_owned(),
216 self.ctx.current_schema(),
217 table.to_string_unquoted(),
218 )),
219 [schema, table] => Ok((
220 self.ctx.current_catalog().to_owned(),
221 schema.to_string_unquoted(),
222 table.to_string_unquoted(),
223 )),
224 [catalog, schema, table] => Ok((
225 catalog.to_string_unquoted(),
226 schema.to_string_unquoted(),
227 table.to_string_unquoted(),
228 )),
229 _ => InvalidSqlSnafu {
230 err_msg: format!(
231 "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
232 ),
233 }.fail(),
234 }
235 }
236}
237
238struct SqlRowConverter<'a, 'b> {
239 insert_columns: &'a [&'a ColumnSchema],
240 query_context: &'b QueryContextRef,
241}
242
243impl<'a, 'b> SqlRowConverter<'a, 'b> {
244 fn new(insert_columns: &'a [&'a ColumnSchema], query_context: &'b QueryContextRef) -> Self {
245 Self {
246 insert_columns,
247 query_context,
248 }
249 }
250
251 fn convert(
252 &self,
253 updater: &mut JsonColumnTypeUpdater<'_, 'a>,
254 sql_rows: &[Vec<SqlValue>],
255 ) -> Result<Vec<Vec<Value>>> {
256 let timezone = Some(&self.query_context.timezone());
257 let auto_string_to_numeric = self.query_context.auto_string_to_numeric();
258
259 let mut value_rows = Vec::with_capacity(sql_rows.len());
260 for sql_row in sql_rows {
261 let mut value_row = Vec::with_capacity(self.insert_columns.len());
262
263 for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) {
264 let value =
265 sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?;
266
267 updater.merge_types(insert_column, &value)?;
268
269 value_row.push(value);
270 }
271 value_rows.push(value_row);
272 }
273 Ok(value_rows)
274 }
275}
276
277struct JsonColumnTypeUpdater<'a, 'b> {
278 statement_executor: &'a StatementExecutor,
279 query_context: &'a QueryContextRef,
280 merged_value_types: LazyCell<HashMap<&'b str, JsonType>>,
281}
282
283impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> {
284 fn new(statement_executor: &'a StatementExecutor, query_context: &'a QueryContextRef) -> Self {
285 Self {
286 statement_executor,
287 query_context,
288 merged_value_types: LazyCell::new(Default::default),
289 }
290 }
291
292 fn merge_types(&mut self, column_schema: &'b ColumnSchema, value: &Value) -> Result<()> {
293 if !matches!(value, Value::Json(_)) {
294 return Ok(());
295 }
296
297 if let ConcreteDataType::Json(value_type) = value.data_type() {
298 let merged_type = self
299 .merged_value_types
300 .entry(&column_schema.name)
301 .or_insert_with(|| value_type.clone());
302
303 if !merged_type.is_include(&value_type) {
304 merged_type.merge(&value_type).map_err(|e| {
305 InvalidInsertRequestSnafu {
306 reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#),
307 }
308 .build()
309 })?;
310 }
311 }
312 Ok(())
313 }
314
315 async fn maybe_update_column_type(
316 self,
317 catalog: &str,
318 schema: &str,
319 table: &str,
320 insert_columns: &[&ColumnSchema],
321 ) -> Result<bool> {
322 let mut has_update = false;
323 for (column_name, merged_type) in self.merged_value_types.iter() {
324 let Some(column_type) = insert_columns
325 .iter()
326 .find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json()))
327 .flatten()
328 else {
329 continue;
330 };
331 if column_type.is_include(merged_type) {
332 continue;
333 }
334
335 let new_column_type = {
336 let mut x = column_type.clone();
337 x.merge(merged_type)
338 .map_err(|e| {
339 InvalidInsertRequestSnafu {
340 reason: format!(
341 r#"cannot merge "{merged_type}" into "{column_type}": {e}"#
342 ),
343 }
344 .build()
345 })
346 .map(|()| x)
347 }?;
348 info!(
349 "updating table {}.{}.{} column {} json type: {} => {}",
350 catalog, schema, table, column_name, column_type, new_column_type,
351 );
352
353 let (target_type, target_type_extension) =
354 ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(new_column_type))
355 .context(ColumnDataTypeSnafu)?
356 .into_parts();
357 let alter_expr = AlterTableExpr {
358 catalog_name: catalog.to_string(),
359 schema_name: schema.to_string(),
360 table_name: table.to_string(),
361 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
362 modify_column_types: vec![ModifyColumnType {
363 column_name: column_name.to_string(),
364 target_type: target_type as i32,
365 target_type_extension,
366 }],
367 })),
368 };
369 self.statement_executor
370 .alter_table_inner(alter_expr, self.query_context.clone())
371 .await?;
372
373 has_update = true;
374 }
375 Ok(has_update)
376 }
377}
378
379fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
380 if !stmt.columns().is_empty() {
381 stmt.columns()
382 } else {
383 table_schema
384 .column_schemas()
385 .iter()
386 .map(|column| &column.name)
387 .collect()
388 }
389}
390
391fn sql_value_to_value(
395 column_schema: &ColumnSchema,
396 sql_val: &SqlValue,
397 timezone: Option<&Timezone>,
398 auto_string_to_numeric: bool,
399) -> Result<Value> {
400 let column = &column_schema.name;
401 let value = if replace_default(sql_val) {
402 let default_value = column_schema
403 .create_default()
404 .context(ColumnDefaultValueSnafu {
405 column: column.clone(),
406 })?;
407
408 default_value.context(ColumnNoneDefaultValueSnafu {
409 column: column.clone(),
410 })?
411 } else {
412 common_sql::convert::sql_value_to_value(
413 column,
414 &column_schema.data_type,
415 sql_val,
416 timezone,
417 None,
418 auto_string_to_numeric,
419 )
420 .context(crate::error::SqlCommonSnafu)?
421 };
422 validate(&value)?;
423 Ok(value)
424}
425
426fn validate(value: &Value) -> Result<()> {
427 match value {
428 Value::Json(value) => {
429 ensure!(
432 !value.is_empty_object(),
433 InvalidInsertRequestSnafu {
434 reason: "empty json object is not supported, consider adding a dummy field"
435 }
436 );
437 Ok(())
438 }
439 _ => Ok(()),
440 }
441}
442
443fn replace_default(sql_val: &SqlValue) -> bool {
444 matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
445}