Skip to main content

operator/req_convert/insert/
stmt_to_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
16use api::v1::column_def::options_from_column_schema;
17use api::v1::region::InsertRequests as RegionInsertRequests;
18use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows};
19use catalog::CatalogManager;
20use common_time::Timezone;
21use datatypes::schema::{ColumnSchema, SchemaRef};
22use datatypes::value::Value;
23use partition::manager::PartitionRuleManager;
24use session::context::{QueryContext, QueryContextRef};
25use snafu::{OptionExt, ResultExt, ensure};
26use sql::ast::ObjectNamePartExt;
27use sql::statements::insert::Insert;
28use sqlparser::ast::{ObjectName, Value as SqlValue};
29use table::TableRef;
30use table::metadata::TableInfoRef;
31
32use crate::error::{
33    CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
34    ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidSqlSnafu, MissingInsertBodySnafu,
35    ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu,
36};
37use crate::insert::InstantAndNormalInsertRequests;
38use crate::req_convert::common::partitioner::Partitioner;
39use crate::req_convert::insert::semantic_type;
40
41const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
42
43pub struct StatementToRegion<'a> {
44    catalog_manager: &'a dyn CatalogManager,
45    partition_manager: &'a PartitionRuleManager,
46    ctx: &'a QueryContext,
47}
48
49impl<'a> StatementToRegion<'a> {
50    pub fn new(
51        catalog_manager: &'a dyn CatalogManager,
52        partition_manager: &'a PartitionRuleManager,
53        ctx: &'a QueryContext,
54    ) -> Self {
55        Self {
56            catalog_manager,
57            partition_manager,
58            ctx,
59        }
60    }
61
62    pub async fn convert(
63        &self,
64        stmt: &Insert,
65        query_ctx: &QueryContextRef,
66    ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
67        let name = stmt.table_name().context(ParseSqlSnafu)?;
68        let (catalog, schema, table_name) = self.get_full_name(name)?;
69        let table = self.get_table(&catalog, &schema, &table_name).await?;
70        let table_schema = table.schema();
71
72        ensure!(
73            !common_catalog::consts::is_readonly_schema(&schema),
74            SchemaReadOnlySnafu { name: schema }
75        );
76
77        let column_names = column_names(stmt, &table_schema);
78        let column_count = column_names.len();
79
80        let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
81        let row_count = sql_rows.len();
82
83        sql_rows.iter().try_for_each(|r| {
84            ensure!(
85                r.len() == column_count,
86                InvalidSqlSnafu {
87                    err_msg: format!(
88                        "column count mismatch, columns: {}, values: {}",
89                        column_count,
90                        r.len()
91                    )
92                }
93            );
94            Ok(())
95        })?;
96
97        let mut rows = vec![
98            Row {
99                values: Vec::with_capacity(column_count)
100            };
101            row_count
102        ];
103
104        fn find_insert_columns<'a>(
105            table: &'a TableRef,
106            column_names: &[&String],
107        ) -> Result<Vec<&'a ColumnSchema>> {
108            let schema = table.schema_ref();
109            column_names
110                .iter()
111                .map(|name| {
112                    schema
113                        .column_schema_by_name(name)
114                        .context(ColumnNotFoundSnafu { msg: *name })
115                })
116                .collect::<Result<Vec<_>>>()
117        }
118
119        let insert_columns = find_insert_columns(&table, &column_names)?;
120        let converter = SqlRowConverter::new(&insert_columns, query_ctx);
121        let value_rows = converter.convert(&sql_rows)?;
122        for (i, row) in value_rows.into_iter().enumerate() {
123            for value in row {
124                let grpc_value = to_grpc_value(value);
125                rows[i].values.push(grpc_value);
126            }
127        }
128
129        let table_info = table.table_info();
130        let mut schema = Vec::with_capacity(column_count);
131        for column_schema in insert_columns {
132            let (datatype, datatype_extension) =
133                ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
134                    .context(ColumnDataTypeSnafu)?
135                    .to_parts();
136
137            let column_name = &column_schema.name;
138            let semantic_type = semantic_type(&table_info, column_name)?;
139
140            let grpc_column_schema = GrpcColumnSchema {
141                column_name: column_name.clone(),
142                datatype: datatype.into(),
143                semantic_type: semantic_type.into(),
144                datatype_extension,
145                options: options_from_column_schema(column_schema),
146            };
147            schema.push(grpc_column_schema);
148        }
149
150        let requests = Partitioner::new(self.partition_manager)
151            .partition_insert_requests(&table_info, Rows { schema, rows })
152            .await?;
153        let requests = RegionInsertRequests { requests };
154        if table_info.is_ttl_instant_table() {
155            Ok((
156                InstantAndNormalInsertRequests {
157                    normal_requests: Default::default(),
158                    instant_requests: requests,
159                },
160                table_info,
161            ))
162        } else {
163            Ok((
164                InstantAndNormalInsertRequests {
165                    normal_requests: requests,
166                    instant_requests: Default::default(),
167                },
168                table_info,
169            ))
170        }
171    }
172
173    async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
174        self.catalog_manager
175            .table(catalog, schema, table, None)
176            .await
177            .context(CatalogSnafu)?
178            .with_context(|| TableNotFoundSnafu {
179                table_name: common_catalog::format_full_table_name(catalog, schema, table),
180            })
181    }
182
183    fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
184        match &obj_name.0[..] {
185            [table] => Ok((
186                self.ctx.current_catalog().to_owned(),
187                self.ctx.current_schema(),
188                table.to_string_unquoted(),
189            )),
190            [schema, table] => Ok((
191                self.ctx.current_catalog().to_owned(),
192                schema.to_string_unquoted(),
193                table.to_string_unquoted(),
194            )),
195            [catalog, schema, table] => Ok((
196                catalog.to_string_unquoted(),
197                schema.to_string_unquoted(),
198                table.to_string_unquoted(),
199            )),
200            _ => InvalidSqlSnafu {
201                err_msg: format!(
202                    "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
203                ),
204            }.fail(),
205        }
206    }
207}
208
209struct SqlRowConverter<'a, 'b> {
210    insert_columns: &'a [&'a ColumnSchema],
211    query_context: &'b QueryContextRef,
212}
213
214impl<'a, 'b> SqlRowConverter<'a, 'b> {
215    fn new(insert_columns: &'a [&'a ColumnSchema], query_context: &'b QueryContextRef) -> Self {
216        Self {
217            insert_columns,
218            query_context,
219        }
220    }
221
222    fn convert(&self, sql_rows: &[Vec<SqlValue>]) -> Result<Vec<Vec<Value>>> {
223        let timezone = Some(&self.query_context.timezone());
224        let auto_string_to_numeric = self.query_context.auto_string_to_numeric();
225
226        let mut value_rows = Vec::with_capacity(sql_rows.len());
227        for sql_row in sql_rows {
228            let mut value_row = Vec::with_capacity(self.insert_columns.len());
229
230            for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) {
231                let value =
232                    sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?;
233                value_row.push(value);
234            }
235            value_rows.push(value_row);
236        }
237        Ok(value_rows)
238    }
239}
240
241fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
242    if !stmt.columns().is_empty() {
243        stmt.columns()
244    } else {
245        table_schema
246            .column_schemas()
247            .iter()
248            .map(|column| &column.name)
249            .collect()
250    }
251}
252
253/// Converts SQL value to gRPC value according to the column schema.
254/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
255/// and fills the default value if the cast fails.
256fn sql_value_to_value(
257    column_schema: &ColumnSchema,
258    sql_val: &SqlValue,
259    timezone: Option<&Timezone>,
260    auto_string_to_numeric: bool,
261) -> Result<Value> {
262    let column = &column_schema.name;
263    let value = if replace_default(sql_val) {
264        let default_value = column_schema
265            .create_default()
266            .context(ColumnDefaultValueSnafu {
267                column: column.clone(),
268            })?;
269
270        default_value.context(ColumnNoneDefaultValueSnafu {
271            column: column.clone(),
272        })?
273    } else {
274        common_sql::convert::sql_value_to_value(
275            column_schema,
276            sql_val,
277            timezone,
278            None,
279            auto_string_to_numeric,
280        )
281        .context(crate::error::SqlCommonSnafu)?
282    };
283    validate(&value)?;
284    Ok(value)
285}
286
287fn validate(value: &Value) -> Result<()> {
288    match value {
289        Value::Json(value) => {
290            // Json object will be stored as Arrow struct in parquet, and it has the restriction:
291            // "Parquet does not support writing empty structs".
292            ensure!(
293                !value.is_empty_object(),
294                InvalidInsertRequestSnafu {
295                    reason: "empty json object is not supported, consider adding a dummy field"
296                }
297            );
298            Ok(())
299        }
300        _ => Ok(()),
301    }
302}
303
304fn replace_default(sql_val: &SqlValue) -> bool {
305    matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
306}