operator/req_convert/insert/
stmt_to_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::LazyCell;
16use std::collections::HashMap;
17
18use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
19use api::v1::alter_table_expr::Kind;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::region::InsertRequests as RegionInsertRequests;
22use api::v1::{
23    AlterTableExpr, ColumnSchema as GrpcColumnSchema, ModifyColumnType, ModifyColumnTypes, Row,
24    Rows,
25};
26use catalog::CatalogManager;
27use common_telemetry::info;
28use common_time::Timezone;
29use datatypes::data_type::ConcreteDataType;
30use datatypes::schema::{ColumnSchema, SchemaRef};
31use datatypes::types::JsonType;
32use datatypes::value::Value;
33use partition::manager::PartitionRuleManager;
34use session::context::{QueryContext, QueryContextRef};
35use snafu::{OptionExt, ResultExt, ensure};
36use sql::ast::ObjectNamePartExt;
37use sql::statements::insert::Insert;
38use sqlparser::ast::{ObjectName, Value as SqlValue};
39use table::TableRef;
40use table::metadata::TableInfoRef;
41
42use crate::error::{
43    CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
44    ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidSqlSnafu, MissingInsertBodySnafu,
45    ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu,
46};
47use crate::insert::InstantAndNormalInsertRequests;
48use crate::req_convert::common::partitioner::Partitioner;
49use crate::req_convert::insert::semantic_type;
50use crate::statement::StatementExecutor;
51
52const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
53
54pub struct StatementToRegion<'a> {
55    catalog_manager: &'a dyn CatalogManager,
56    partition_manager: &'a PartitionRuleManager,
57    ctx: &'a QueryContext,
58}
59
60impl<'a> StatementToRegion<'a> {
61    pub fn new(
62        catalog_manager: &'a dyn CatalogManager,
63        partition_manager: &'a PartitionRuleManager,
64        ctx: &'a QueryContext,
65    ) -> Self {
66        Self {
67            catalog_manager,
68            partition_manager,
69            ctx,
70        }
71    }
72
73    pub async fn convert(
74        &self,
75        stmt: &Insert,
76        query_ctx: &QueryContextRef,
77        statement_executor: &StatementExecutor,
78    ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
79        let name = stmt.table_name().context(ParseSqlSnafu)?;
80        let (catalog, schema, table_name) = self.get_full_name(name)?;
81        let mut table = self.get_table(&catalog, &schema, &table_name).await?;
82        let table_schema = table.schema();
83
84        ensure!(
85            !common_catalog::consts::is_readonly_schema(&schema),
86            SchemaReadOnlySnafu { name: schema }
87        );
88
89        let column_names = column_names(stmt, &table_schema);
90        let column_count = column_names.len();
91
92        let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
93        let row_count = sql_rows.len();
94
95        sql_rows.iter().try_for_each(|r| {
96            ensure!(
97                r.len() == column_count,
98                InvalidSqlSnafu {
99                    err_msg: format!(
100                        "column count mismatch, columns: {}, values: {}",
101                        column_count,
102                        r.len()
103                    )
104                }
105            );
106            Ok(())
107        })?;
108
109        let mut rows = vec![
110            Row {
111                values: Vec::with_capacity(column_count)
112            };
113            row_count
114        ];
115
116        fn find_insert_columns<'a>(
117            table: &'a TableRef,
118            column_names: &[&String],
119        ) -> Result<Vec<&'a ColumnSchema>> {
120            let schema = table.schema_ref();
121            column_names
122                .iter()
123                .map(|name| {
124                    schema
125                        .column_schema_by_name(name)
126                        .context(ColumnNotFoundSnafu { msg: *name })
127                })
128                .collect::<Result<Vec<_>>>()
129        }
130
131        let mut insert_columns = find_insert_columns(&table, &column_names)?;
132        let converter = SqlRowConverter::new(&insert_columns, query_ctx);
133
134        // Convert the SQL values to GreptimeDB values, and merge a "largest" JSON types of all
135        // values on the way by `JsonColumnTypeUpdater`.
136        let mut updater = JsonColumnTypeUpdater::new(statement_executor, query_ctx);
137        let value_rows = converter.convert(&mut updater, &sql_rows)?;
138
139        // If the JSON values have a "larger" json type than the one in the table schema, modify
140        // the column's json type first, by executing an "alter table" DDL.
141        if updater
142            .maybe_update_column_type(&catalog, &schema, &table_name, &insert_columns)
143            .await?
144        {
145            // Update with the latest schema, if changed.
146            table = self.get_table(&catalog, &schema, &table_name).await?;
147            insert_columns = find_insert_columns(&table, &column_names)?;
148        }
149
150        // Finally convert GreptimeDB values to GRPC values, ready to do insertion on Datanode.
151        for (i, row) in value_rows.into_iter().enumerate() {
152            for value in row {
153                let grpc_value = to_grpc_value(value);
154                rows[i].values.push(grpc_value);
155            }
156        }
157
158        let table_info = table.table_info();
159        let mut schema = Vec::with_capacity(column_count);
160        for column_schema in insert_columns {
161            let (datatype, datatype_extension) =
162                ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
163                    .context(ColumnDataTypeSnafu)?
164                    .to_parts();
165
166            let column_name = &column_schema.name;
167            let semantic_type = semantic_type(&table_info, column_name)?;
168
169            let grpc_column_schema = GrpcColumnSchema {
170                column_name: column_name.clone(),
171                datatype: datatype.into(),
172                semantic_type: semantic_type.into(),
173                datatype_extension,
174                options: options_from_column_schema(column_schema),
175            };
176            schema.push(grpc_column_schema);
177        }
178
179        let requests = Partitioner::new(self.partition_manager)
180            .partition_insert_requests(&table_info, Rows { schema, rows })
181            .await?;
182        let requests = RegionInsertRequests { requests };
183        if table_info.is_ttl_instant_table() {
184            Ok((
185                InstantAndNormalInsertRequests {
186                    normal_requests: Default::default(),
187                    instant_requests: requests,
188                },
189                table_info,
190            ))
191        } else {
192            Ok((
193                InstantAndNormalInsertRequests {
194                    normal_requests: requests,
195                    instant_requests: Default::default(),
196                },
197                table_info,
198            ))
199        }
200    }
201
202    async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
203        self.catalog_manager
204            .table(catalog, schema, table, None)
205            .await
206            .context(CatalogSnafu)?
207            .with_context(|| TableNotFoundSnafu {
208                table_name: common_catalog::format_full_table_name(catalog, schema, table),
209            })
210    }
211
212    fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
213        match &obj_name.0[..] {
214            [table] => Ok((
215                self.ctx.current_catalog().to_owned(),
216                self.ctx.current_schema(),
217                table.to_string_unquoted(),
218            )),
219            [schema, table] => Ok((
220                self.ctx.current_catalog().to_owned(),
221                schema.to_string_unquoted(),
222                table.to_string_unquoted(),
223            )),
224            [catalog, schema, table] => Ok((
225                catalog.to_string_unquoted(),
226                schema.to_string_unquoted(),
227                table.to_string_unquoted(),
228            )),
229            _ => InvalidSqlSnafu {
230                err_msg: format!(
231                    "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
232                ),
233            }.fail(),
234        }
235    }
236}
237
238struct SqlRowConverter<'a, 'b> {
239    insert_columns: &'a [&'a ColumnSchema],
240    query_context: &'b QueryContextRef,
241}
242
243impl<'a, 'b> SqlRowConverter<'a, 'b> {
244    fn new(insert_columns: &'a [&'a ColumnSchema], query_context: &'b QueryContextRef) -> Self {
245        Self {
246            insert_columns,
247            query_context,
248        }
249    }
250
251    fn convert(
252        &self,
253        updater: &mut JsonColumnTypeUpdater<'_, 'a>,
254        sql_rows: &[Vec<SqlValue>],
255    ) -> Result<Vec<Vec<Value>>> {
256        let timezone = Some(&self.query_context.timezone());
257        let auto_string_to_numeric = self.query_context.auto_string_to_numeric();
258
259        let mut value_rows = Vec::with_capacity(sql_rows.len());
260        for sql_row in sql_rows {
261            let mut value_row = Vec::with_capacity(self.insert_columns.len());
262
263            for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) {
264                let value =
265                    sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?;
266
267                updater.merge_types(insert_column, &value)?;
268
269                value_row.push(value);
270            }
271            value_rows.push(value_row);
272        }
273        Ok(value_rows)
274    }
275}
276
277struct JsonColumnTypeUpdater<'a, 'b> {
278    statement_executor: &'a StatementExecutor,
279    query_context: &'a QueryContextRef,
280    merged_value_types: LazyCell<HashMap<&'b str, JsonType>>,
281}
282
283impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> {
284    fn new(statement_executor: &'a StatementExecutor, query_context: &'a QueryContextRef) -> Self {
285        Self {
286            statement_executor,
287            query_context,
288            merged_value_types: LazyCell::new(Default::default),
289        }
290    }
291
292    fn merge_types(&mut self, column_schema: &'b ColumnSchema, value: &Value) -> Result<()> {
293        if !matches!(value, Value::Json(_)) {
294            return Ok(());
295        }
296
297        if let ConcreteDataType::Json(value_type) = value.data_type() {
298            let merged_type = self
299                .merged_value_types
300                .entry(&column_schema.name)
301                .or_insert_with(|| value_type.clone());
302
303            if !merged_type.is_include(&value_type) {
304                merged_type.merge(&value_type).map_err(|e| {
305                    InvalidInsertRequestSnafu {
306                        reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#),
307                    }
308                    .build()
309                })?;
310            }
311        }
312        Ok(())
313    }
314
315    async fn maybe_update_column_type(
316        self,
317        catalog: &str,
318        schema: &str,
319        table: &str,
320        insert_columns: &[&ColumnSchema],
321    ) -> Result<bool> {
322        let mut has_update = false;
323        for (column_name, merged_type) in self.merged_value_types.iter() {
324            let Some(column_type) = insert_columns
325                .iter()
326                .find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json()))
327                .flatten()
328            else {
329                continue;
330            };
331            if column_type.is_include(merged_type) {
332                continue;
333            }
334
335            let new_column_type = {
336                let mut x = column_type.clone();
337                x.merge(merged_type)
338                    .map_err(|e| {
339                        InvalidInsertRequestSnafu {
340                            reason: format!(
341                                r#"cannot merge "{merged_type}" into "{column_type}": {e}"#
342                            ),
343                        }
344                        .build()
345                    })
346                    .map(|()| x)
347            }?;
348            info!(
349                "updating table {}.{}.{} column {} json type: {} => {}",
350                catalog, schema, table, column_name, column_type, new_column_type,
351            );
352
353            let (target_type, target_type_extension) =
354                ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(new_column_type))
355                    .context(ColumnDataTypeSnafu)?
356                    .into_parts();
357            let alter_expr = AlterTableExpr {
358                catalog_name: catalog.to_string(),
359                schema_name: schema.to_string(),
360                table_name: table.to_string(),
361                kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
362                    modify_column_types: vec![ModifyColumnType {
363                        column_name: column_name.to_string(),
364                        target_type: target_type as i32,
365                        target_type_extension,
366                    }],
367                })),
368            };
369            self.statement_executor
370                .alter_table_inner(alter_expr, self.query_context.clone())
371                .await?;
372
373            has_update = true;
374        }
375        Ok(has_update)
376    }
377}
378
379fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
380    if !stmt.columns().is_empty() {
381        stmt.columns()
382    } else {
383        table_schema
384            .column_schemas()
385            .iter()
386            .map(|column| &column.name)
387            .collect()
388    }
389}
390
391/// Converts SQL value to gRPC value according to the column schema.
392/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
393/// and fills the default value if the cast fails.
394fn sql_value_to_value(
395    column_schema: &ColumnSchema,
396    sql_val: &SqlValue,
397    timezone: Option<&Timezone>,
398    auto_string_to_numeric: bool,
399) -> Result<Value> {
400    let column = &column_schema.name;
401    let value = if replace_default(sql_val) {
402        let default_value = column_schema
403            .create_default()
404            .context(ColumnDefaultValueSnafu {
405                column: column.clone(),
406            })?;
407
408        default_value.context(ColumnNoneDefaultValueSnafu {
409            column: column.clone(),
410        })?
411    } else {
412        common_sql::convert::sql_value_to_value(
413            column,
414            &column_schema.data_type,
415            sql_val,
416            timezone,
417            None,
418            auto_string_to_numeric,
419        )
420        .context(crate::error::SqlCommonSnafu)?
421    };
422    validate(&value)?;
423    Ok(value)
424}
425
426fn validate(value: &Value) -> Result<()> {
427    match value {
428        Value::Json(value) => {
429            // Json object will be stored as Arrow struct in parquet, and it has the restriction:
430            // "Parquet does not support writing empty structs".
431            ensure!(
432                !value.is_empty_object(),
433                InvalidInsertRequestSnafu {
434                    reason: "empty json object is not supported, consider adding a dummy field"
435                }
436            );
437            Ok(())
438        }
439        _ => Ok(()),
440    }
441}
442
443fn replace_default(sql_val: &SqlValue) -> bool {
444    matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
445}