operator/req_convert/insert/
stmt_to_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
16use api::v1::column_def::options_from_column_schema;
17use api::v1::region::InsertRequests as RegionInsertRequests;
18use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
19use catalog::CatalogManager;
20use common_time::Timezone;
21use datatypes::schema::{ColumnSchema, SchemaRef};
22use partition::manager::PartitionRuleManager;
23use session::context::{QueryContext, QueryContextRef};
24use snafu::{ensure, OptionExt, ResultExt};
25use sql::statements;
26use sql::statements::insert::Insert;
27use sqlparser::ast::{ObjectName, Value as SqlValue};
28use table::metadata::TableInfoRef;
29use table::TableRef;
30
31use crate::error::{
32    CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
33    ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
34    SchemaReadOnlySnafu, TableNotFoundSnafu,
35};
36use crate::insert::InstantAndNormalInsertRequests;
37use crate::req_convert::common::partitioner::Partitioner;
38use crate::req_convert::insert::semantic_type;
39
40const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
41
42pub struct StatementToRegion<'a> {
43    catalog_manager: &'a dyn CatalogManager,
44    partition_manager: &'a PartitionRuleManager,
45    ctx: &'a QueryContext,
46}
47
48impl<'a> StatementToRegion<'a> {
49    pub fn new(
50        catalog_manager: &'a dyn CatalogManager,
51        partition_manager: &'a PartitionRuleManager,
52        ctx: &'a QueryContext,
53    ) -> Self {
54        Self {
55            catalog_manager,
56            partition_manager,
57            ctx,
58        }
59    }
60
61    pub async fn convert(
62        &self,
63        stmt: &Insert,
64        query_ctx: &QueryContextRef,
65    ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
66        let name = stmt.table_name().context(ParseSqlSnafu)?;
67        let (catalog, schema, table_name) = self.get_full_name(name)?;
68        let table = self.get_table(&catalog, &schema, &table_name).await?;
69        let table_schema = table.schema();
70        let table_info = table.table_info();
71
72        ensure!(
73            !common_catalog::consts::is_readonly_schema(&schema),
74            SchemaReadOnlySnafu { name: schema }
75        );
76
77        let column_names = column_names(stmt, &table_schema);
78        let column_count = column_names.len();
79
80        let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
81        let row_count = sql_rows.len();
82
83        sql_rows.iter().try_for_each(|r| {
84            ensure!(
85                r.len() == column_count,
86                InvalidSqlSnafu {
87                    err_msg: format!(
88                        "column count mismatch, columns: {}, values: {}",
89                        column_count,
90                        r.len()
91                    )
92                }
93            );
94            Ok(())
95        })?;
96
97        let mut schema = Vec::with_capacity(column_count);
98        let mut rows = vec![
99            Row {
100                values: Vec::with_capacity(column_count)
101            };
102            row_count
103        ];
104
105        for (i, column_name) in column_names.into_iter().enumerate() {
106            let column_schema = table_schema
107                .column_schema_by_name(column_name)
108                .with_context(|| ColumnNotFoundSnafu {
109                    msg: format!("Column {} not found in table {}", column_name, &table_name),
110                })?;
111
112            let (datatype, datatype_extension) =
113                ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
114                    .context(ColumnDataTypeSnafu)?
115                    .to_parts();
116            let semantic_type = semantic_type(&table_info, column_name)?;
117
118            let grpc_column_schema = GrpcColumnSchema {
119                column_name: column_name.clone(),
120                datatype: datatype.into(),
121                semantic_type: semantic_type.into(),
122                datatype_extension,
123                options: options_from_column_schema(column_schema),
124            };
125            schema.push(grpc_column_schema);
126
127            for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
128                let value = sql_value_to_grpc_value(
129                    column_schema,
130                    &sql_row[i],
131                    Some(&query_ctx.timezone()),
132                    query_ctx.auto_string_to_numeric(),
133                )?;
134                grpc_row.values.push(value);
135            }
136        }
137
138        let requests = Partitioner::new(self.partition_manager)
139            .partition_insert_requests(table_info.table_id(), Rows { schema, rows })
140            .await?;
141        let requests = RegionInsertRequests { requests };
142        if table_info.is_ttl_instant_table() {
143            Ok((
144                InstantAndNormalInsertRequests {
145                    normal_requests: Default::default(),
146                    instant_requests: requests,
147                },
148                table_info,
149            ))
150        } else {
151            Ok((
152                InstantAndNormalInsertRequests {
153                    normal_requests: requests,
154                    instant_requests: Default::default(),
155                },
156                table_info,
157            ))
158        }
159    }
160
161    async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
162        self.catalog_manager
163            .table(catalog, schema, table, None)
164            .await
165            .context(CatalogSnafu)?
166            .with_context(|| TableNotFoundSnafu {
167                table_name: common_catalog::format_full_table_name(catalog, schema, table),
168            })
169    }
170
171    fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
172        match &obj_name.0[..] {
173            [table] => Ok((
174                self.ctx.current_catalog().to_owned(),
175                self.ctx.current_schema(),
176                table.value.clone(),
177            )),
178            [schema, table] => Ok((
179                self.ctx.current_catalog().to_owned(),
180                schema.value.clone(),
181                table.value.clone(),
182            )),
183            [catalog, schema, table] => Ok((
184                catalog.value.clone(),
185                schema.value.clone(),
186                table.value.clone(),
187            )),
188            _ => InvalidSqlSnafu {
189                err_msg: format!(
190                    "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
191                ),
192            }.fail(),
193        }
194    }
195}
196
197fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
198    if !stmt.columns().is_empty() {
199        stmt.columns()
200    } else {
201        table_schema
202            .column_schemas()
203            .iter()
204            .map(|column| &column.name)
205            .collect()
206    }
207}
208
209/// Converts SQL value to gRPC value according to the column schema.
210/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
211/// and fills the default value if the cast fails.
212fn sql_value_to_grpc_value(
213    column_schema: &ColumnSchema,
214    sql_val: &SqlValue,
215    timezone: Option<&Timezone>,
216    auto_string_to_numeric: bool,
217) -> Result<GrpcValue> {
218    let column = &column_schema.name;
219    let value = if replace_default(sql_val) {
220        let default_value = column_schema
221            .create_default()
222            .context(ColumnDefaultValueSnafu {
223                column: column.clone(),
224            })?;
225
226        default_value.context(ColumnNoneDefaultValueSnafu {
227            column: column.clone(),
228        })?
229    } else {
230        statements::sql_value_to_value(
231            column,
232            &column_schema.data_type,
233            sql_val,
234            timezone,
235            None,
236            auto_string_to_numeric,
237        )
238        .context(ParseSqlSnafu)?
239    };
240
241    let grpc_value = value_to_grpc_value(value);
242    Ok(grpc_value)
243}
244
245fn replace_default(sql_val: &SqlValue) -> bool {
246    matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
247}