operator/req_convert/insert/
stmt_to_region.rs1use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
16use api::v1::column_def::options_from_column_schema;
17use api::v1::region::InsertRequests as RegionInsertRequests;
18use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows};
19use catalog::CatalogManager;
20use common_time::Timezone;
21use datatypes::schema::{ColumnSchema, SchemaRef};
22use datatypes::value::Value;
23use partition::manager::PartitionRuleManager;
24use session::context::{QueryContext, QueryContextRef};
25use snafu::{OptionExt, ResultExt, ensure};
26use sql::ast::ObjectNamePartExt;
27use sql::statements::insert::Insert;
28use sqlparser::ast::{ObjectName, Value as SqlValue};
29use table::TableRef;
30use table::metadata::TableInfoRef;
31
32use crate::error::{
33 CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
34 ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidSqlSnafu, MissingInsertBodySnafu,
35 ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu,
36};
37use crate::insert::InstantAndNormalInsertRequests;
38use crate::req_convert::common::partitioner::Partitioner;
39use crate::req_convert::insert::semantic_type;
40
41const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
42
43pub struct StatementToRegion<'a> {
44 catalog_manager: &'a dyn CatalogManager,
45 partition_manager: &'a PartitionRuleManager,
46 ctx: &'a QueryContext,
47}
48
49impl<'a> StatementToRegion<'a> {
50 pub fn new(
51 catalog_manager: &'a dyn CatalogManager,
52 partition_manager: &'a PartitionRuleManager,
53 ctx: &'a QueryContext,
54 ) -> Self {
55 Self {
56 catalog_manager,
57 partition_manager,
58 ctx,
59 }
60 }
61
62 pub async fn convert(
63 &self,
64 stmt: &Insert,
65 query_ctx: &QueryContextRef,
66 ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
67 let name = stmt.table_name().context(ParseSqlSnafu)?;
68 let (catalog, schema, table_name) = self.get_full_name(name)?;
69 let table = self.get_table(&catalog, &schema, &table_name).await?;
70 let table_schema = table.schema();
71
72 ensure!(
73 !common_catalog::consts::is_readonly_schema(&schema),
74 SchemaReadOnlySnafu { name: schema }
75 );
76
77 let column_names = column_names(stmt, &table_schema);
78 let column_count = column_names.len();
79
80 let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
81 let row_count = sql_rows.len();
82
83 sql_rows.iter().try_for_each(|r| {
84 ensure!(
85 r.len() == column_count,
86 InvalidSqlSnafu {
87 err_msg: format!(
88 "column count mismatch, columns: {}, values: {}",
89 column_count,
90 r.len()
91 )
92 }
93 );
94 Ok(())
95 })?;
96
97 let mut rows = vec![
98 Row {
99 values: Vec::with_capacity(column_count)
100 };
101 row_count
102 ];
103
104 fn find_insert_columns<'a>(
105 table: &'a TableRef,
106 column_names: &[&String],
107 ) -> Result<Vec<&'a ColumnSchema>> {
108 let schema = table.schema_ref();
109 column_names
110 .iter()
111 .map(|name| {
112 schema
113 .column_schema_by_name(name)
114 .context(ColumnNotFoundSnafu { msg: *name })
115 })
116 .collect::<Result<Vec<_>>>()
117 }
118
119 let insert_columns = find_insert_columns(&table, &column_names)?;
120 let converter = SqlRowConverter::new(&insert_columns, query_ctx);
121 let value_rows = converter.convert(&sql_rows)?;
122 for (i, row) in value_rows.into_iter().enumerate() {
123 for value in row {
124 let grpc_value = to_grpc_value(value);
125 rows[i].values.push(grpc_value);
126 }
127 }
128
129 let table_info = table.table_info();
130 let mut schema = Vec::with_capacity(column_count);
131 for column_schema in insert_columns {
132 let (datatype, datatype_extension) =
133 ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
134 .context(ColumnDataTypeSnafu)?
135 .to_parts();
136
137 let column_name = &column_schema.name;
138 let semantic_type = semantic_type(&table_info, column_name)?;
139
140 let grpc_column_schema = GrpcColumnSchema {
141 column_name: column_name.clone(),
142 datatype: datatype.into(),
143 semantic_type: semantic_type.into(),
144 datatype_extension,
145 options: options_from_column_schema(column_schema),
146 };
147 schema.push(grpc_column_schema);
148 }
149
150 let requests = Partitioner::new(self.partition_manager)
151 .partition_insert_requests(&table_info, Rows { schema, rows })
152 .await?;
153 let requests = RegionInsertRequests { requests };
154 if table_info.is_ttl_instant_table() {
155 Ok((
156 InstantAndNormalInsertRequests {
157 normal_requests: Default::default(),
158 instant_requests: requests,
159 },
160 table_info,
161 ))
162 } else {
163 Ok((
164 InstantAndNormalInsertRequests {
165 normal_requests: requests,
166 instant_requests: Default::default(),
167 },
168 table_info,
169 ))
170 }
171 }
172
173 async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
174 self.catalog_manager
175 .table(catalog, schema, table, None)
176 .await
177 .context(CatalogSnafu)?
178 .with_context(|| TableNotFoundSnafu {
179 table_name: common_catalog::format_full_table_name(catalog, schema, table),
180 })
181 }
182
183 fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
184 match &obj_name.0[..] {
185 [table] => Ok((
186 self.ctx.current_catalog().to_owned(),
187 self.ctx.current_schema(),
188 table.to_string_unquoted(),
189 )),
190 [schema, table] => Ok((
191 self.ctx.current_catalog().to_owned(),
192 schema.to_string_unquoted(),
193 table.to_string_unquoted(),
194 )),
195 [catalog, schema, table] => Ok((
196 catalog.to_string_unquoted(),
197 schema.to_string_unquoted(),
198 table.to_string_unquoted(),
199 )),
200 _ => InvalidSqlSnafu {
201 err_msg: format!(
202 "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
203 ),
204 }.fail(),
205 }
206 }
207}
208
209struct SqlRowConverter<'a, 'b> {
210 insert_columns: &'a [&'a ColumnSchema],
211 query_context: &'b QueryContextRef,
212}
213
214impl<'a, 'b> SqlRowConverter<'a, 'b> {
215 fn new(insert_columns: &'a [&'a ColumnSchema], query_context: &'b QueryContextRef) -> Self {
216 Self {
217 insert_columns,
218 query_context,
219 }
220 }
221
222 fn convert(&self, sql_rows: &[Vec<SqlValue>]) -> Result<Vec<Vec<Value>>> {
223 let timezone = Some(&self.query_context.timezone());
224 let auto_string_to_numeric = self.query_context.auto_string_to_numeric();
225
226 let mut value_rows = Vec::with_capacity(sql_rows.len());
227 for sql_row in sql_rows {
228 let mut value_row = Vec::with_capacity(self.insert_columns.len());
229
230 for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) {
231 let value =
232 sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?;
233 value_row.push(value);
234 }
235 value_rows.push(value_row);
236 }
237 Ok(value_rows)
238 }
239}
240
241fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
242 if !stmt.columns().is_empty() {
243 stmt.columns()
244 } else {
245 table_schema
246 .column_schemas()
247 .iter()
248 .map(|column| &column.name)
249 .collect()
250 }
251}
252
253fn sql_value_to_value(
257 column_schema: &ColumnSchema,
258 sql_val: &SqlValue,
259 timezone: Option<&Timezone>,
260 auto_string_to_numeric: bool,
261) -> Result<Value> {
262 let column = &column_schema.name;
263 let value = if replace_default(sql_val) {
264 let default_value = column_schema
265 .create_default()
266 .context(ColumnDefaultValueSnafu {
267 column: column.clone(),
268 })?;
269
270 default_value.context(ColumnNoneDefaultValueSnafu {
271 column: column.clone(),
272 })?
273 } else {
274 common_sql::convert::sql_value_to_value(
275 column_schema,
276 sql_val,
277 timezone,
278 None,
279 auto_string_to_numeric,
280 )
281 .context(crate::error::SqlCommonSnafu)?
282 };
283 validate(&value)?;
284 Ok(value)
285}
286
287fn validate(value: &Value) -> Result<()> {
288 match value {
289 Value::Json(value) => {
290 ensure!(
293 !value.is_empty_object(),
294 InvalidInsertRequestSnafu {
295 reason: "empty json object is not supported, consider adding a dummy field"
296 }
297 );
298 Ok(())
299 }
300 _ => Ok(()),
301 }
302}
303
304fn replace_default(sql_val: &SqlValue) -> bool {
305 matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
306}