operator/req_convert/insert/
stmt_to_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
16use api::v1::column_def::options_from_column_schema;
17use api::v1::region::InsertRequests as RegionInsertRequests;
18use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
19use catalog::CatalogManager;
20use common_time::Timezone;
21use datatypes::schema::{ColumnSchema, SchemaRef};
22use partition::manager::PartitionRuleManager;
23use session::context::{QueryContext, QueryContextRef};
24use snafu::{ensure, OptionExt, ResultExt};
25use sql::statements::insert::Insert;
26use sqlparser::ast::{ObjectName, Value as SqlValue};
27use table::metadata::TableInfoRef;
28use table::TableRef;
29
30use crate::error::{
31    CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
32    ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
33    SchemaReadOnlySnafu, TableNotFoundSnafu,
34};
35use crate::insert::InstantAndNormalInsertRequests;
36use crate::req_convert::common::partitioner::Partitioner;
37use crate::req_convert::insert::semantic_type;
38
39const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
40
41pub struct StatementToRegion<'a> {
42    catalog_manager: &'a dyn CatalogManager,
43    partition_manager: &'a PartitionRuleManager,
44    ctx: &'a QueryContext,
45}
46
47impl<'a> StatementToRegion<'a> {
48    pub fn new(
49        catalog_manager: &'a dyn CatalogManager,
50        partition_manager: &'a PartitionRuleManager,
51        ctx: &'a QueryContext,
52    ) -> Self {
53        Self {
54            catalog_manager,
55            partition_manager,
56            ctx,
57        }
58    }
59
60    pub async fn convert(
61        &self,
62        stmt: &Insert,
63        query_ctx: &QueryContextRef,
64    ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
65        let name = stmt.table_name().context(ParseSqlSnafu)?;
66        let (catalog, schema, table_name) = self.get_full_name(name)?;
67        let table = self.get_table(&catalog, &schema, &table_name).await?;
68        let table_schema = table.schema();
69        let table_info = table.table_info();
70
71        ensure!(
72            !common_catalog::consts::is_readonly_schema(&schema),
73            SchemaReadOnlySnafu { name: schema }
74        );
75
76        let column_names = column_names(stmt, &table_schema);
77        let column_count = column_names.len();
78
79        let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
80        let row_count = sql_rows.len();
81
82        sql_rows.iter().try_for_each(|r| {
83            ensure!(
84                r.len() == column_count,
85                InvalidSqlSnafu {
86                    err_msg: format!(
87                        "column count mismatch, columns: {}, values: {}",
88                        column_count,
89                        r.len()
90                    )
91                }
92            );
93            Ok(())
94        })?;
95
96        let mut schema = Vec::with_capacity(column_count);
97        let mut rows = vec![
98            Row {
99                values: Vec::with_capacity(column_count)
100            };
101            row_count
102        ];
103
104        for (i, column_name) in column_names.into_iter().enumerate() {
105            let column_schema = table_schema
106                .column_schema_by_name(column_name)
107                .with_context(|| ColumnNotFoundSnafu {
108                    msg: format!("Column {} not found in table {}", column_name, &table_name),
109                })?;
110
111            let (datatype, datatype_extension) =
112                ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
113                    .context(ColumnDataTypeSnafu)?
114                    .to_parts();
115            let semantic_type = semantic_type(&table_info, column_name)?;
116
117            let grpc_column_schema = GrpcColumnSchema {
118                column_name: column_name.clone(),
119                datatype: datatype.into(),
120                semantic_type: semantic_type.into(),
121                datatype_extension,
122                options: options_from_column_schema(column_schema),
123            };
124            schema.push(grpc_column_schema);
125
126            for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
127                let value = sql_value_to_grpc_value(
128                    column_schema,
129                    &sql_row[i],
130                    Some(&query_ctx.timezone()),
131                    query_ctx.auto_string_to_numeric(),
132                )?;
133                grpc_row.values.push(value);
134            }
135        }
136
137        let requests = Partitioner::new(self.partition_manager)
138            .partition_insert_requests(&table_info, Rows { schema, rows })
139            .await?;
140        let requests = RegionInsertRequests { requests };
141        if table_info.is_ttl_instant_table() {
142            Ok((
143                InstantAndNormalInsertRequests {
144                    normal_requests: Default::default(),
145                    instant_requests: requests,
146                },
147                table_info,
148            ))
149        } else {
150            Ok((
151                InstantAndNormalInsertRequests {
152                    normal_requests: requests,
153                    instant_requests: Default::default(),
154                },
155                table_info,
156            ))
157        }
158    }
159
160    async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
161        self.catalog_manager
162            .table(catalog, schema, table, None)
163            .await
164            .context(CatalogSnafu)?
165            .with_context(|| TableNotFoundSnafu {
166                table_name: common_catalog::format_full_table_name(catalog, schema, table),
167            })
168    }
169
170    fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
171        match &obj_name.0[..] {
172            [table] => Ok((
173                self.ctx.current_catalog().to_owned(),
174                self.ctx.current_schema(),
175                table.value.clone(),
176            )),
177            [schema, table] => Ok((
178                self.ctx.current_catalog().to_owned(),
179                schema.value.clone(),
180                table.value.clone(),
181            )),
182            [catalog, schema, table] => Ok((
183                catalog.value.clone(),
184                schema.value.clone(),
185                table.value.clone(),
186            )),
187            _ => InvalidSqlSnafu {
188                err_msg: format!(
189                    "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
190                ),
191            }.fail(),
192        }
193    }
194}
195
196fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
197    if !stmt.columns().is_empty() {
198        stmt.columns()
199    } else {
200        table_schema
201            .column_schemas()
202            .iter()
203            .map(|column| &column.name)
204            .collect()
205    }
206}
207
208/// Converts SQL value to gRPC value according to the column schema.
209/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
210/// and fills the default value if the cast fails.
211fn sql_value_to_grpc_value(
212    column_schema: &ColumnSchema,
213    sql_val: &SqlValue,
214    timezone: Option<&Timezone>,
215    auto_string_to_numeric: bool,
216) -> Result<GrpcValue> {
217    let column = &column_schema.name;
218    let value = if replace_default(sql_val) {
219        let default_value = column_schema
220            .create_default()
221            .context(ColumnDefaultValueSnafu {
222                column: column.clone(),
223            })?;
224
225        default_value.context(ColumnNoneDefaultValueSnafu {
226            column: column.clone(),
227        })?
228    } else {
229        common_sql::convert::sql_value_to_value(
230            column,
231            &column_schema.data_type,
232            sql_val,
233            timezone,
234            None,
235            auto_string_to_numeric,
236        )
237        .context(crate::error::SqlCommonSnafu)?
238    };
239
240    let grpc_value = value_to_grpc_value(value);
241    Ok(grpc_value)
242}
243
244fn replace_default(sql_val: &SqlValue) -> bool {
245    matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
246}