operator/req_convert/insert/
stmt_to_region.rs1use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
16use api::v1::column_def::options_from_column_schema;
17use api::v1::region::InsertRequests as RegionInsertRequests;
18use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
19use catalog::CatalogManager;
20use common_time::Timezone;
21use datatypes::schema::{ColumnSchema, SchemaRef};
22use partition::manager::PartitionRuleManager;
23use session::context::{QueryContext, QueryContextRef};
24use snafu::{ensure, OptionExt, ResultExt};
25use sql::statements::insert::Insert;
26use sqlparser::ast::{ObjectName, Value as SqlValue};
27use table::metadata::TableInfoRef;
28use table::TableRef;
29
30use crate::error::{
31 CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
32 ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
33 SchemaReadOnlySnafu, TableNotFoundSnafu,
34};
35use crate::insert::InstantAndNormalInsertRequests;
36use crate::req_convert::common::partitioner::Partitioner;
37use crate::req_convert::insert::semantic_type;
38
39const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
40
41pub struct StatementToRegion<'a> {
42 catalog_manager: &'a dyn CatalogManager,
43 partition_manager: &'a PartitionRuleManager,
44 ctx: &'a QueryContext,
45}
46
47impl<'a> StatementToRegion<'a> {
48 pub fn new(
49 catalog_manager: &'a dyn CatalogManager,
50 partition_manager: &'a PartitionRuleManager,
51 ctx: &'a QueryContext,
52 ) -> Self {
53 Self {
54 catalog_manager,
55 partition_manager,
56 ctx,
57 }
58 }
59
60 pub async fn convert(
61 &self,
62 stmt: &Insert,
63 query_ctx: &QueryContextRef,
64 ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
65 let name = stmt.table_name().context(ParseSqlSnafu)?;
66 let (catalog, schema, table_name) = self.get_full_name(name)?;
67 let table = self.get_table(&catalog, &schema, &table_name).await?;
68 let table_schema = table.schema();
69 let table_info = table.table_info();
70
71 ensure!(
72 !common_catalog::consts::is_readonly_schema(&schema),
73 SchemaReadOnlySnafu { name: schema }
74 );
75
76 let column_names = column_names(stmt, &table_schema);
77 let column_count = column_names.len();
78
79 let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
80 let row_count = sql_rows.len();
81
82 sql_rows.iter().try_for_each(|r| {
83 ensure!(
84 r.len() == column_count,
85 InvalidSqlSnafu {
86 err_msg: format!(
87 "column count mismatch, columns: {}, values: {}",
88 column_count,
89 r.len()
90 )
91 }
92 );
93 Ok(())
94 })?;
95
96 let mut schema = Vec::with_capacity(column_count);
97 let mut rows = vec![
98 Row {
99 values: Vec::with_capacity(column_count)
100 };
101 row_count
102 ];
103
104 for (i, column_name) in column_names.into_iter().enumerate() {
105 let column_schema = table_schema
106 .column_schema_by_name(column_name)
107 .with_context(|| ColumnNotFoundSnafu {
108 msg: format!("Column {} not found in table {}", column_name, &table_name),
109 })?;
110
111 let (datatype, datatype_extension) =
112 ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
113 .context(ColumnDataTypeSnafu)?
114 .to_parts();
115 let semantic_type = semantic_type(&table_info, column_name)?;
116
117 let grpc_column_schema = GrpcColumnSchema {
118 column_name: column_name.clone(),
119 datatype: datatype.into(),
120 semantic_type: semantic_type.into(),
121 datatype_extension,
122 options: options_from_column_schema(column_schema),
123 };
124 schema.push(grpc_column_schema);
125
126 for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
127 let value = sql_value_to_grpc_value(
128 column_schema,
129 &sql_row[i],
130 Some(&query_ctx.timezone()),
131 query_ctx.auto_string_to_numeric(),
132 )?;
133 grpc_row.values.push(value);
134 }
135 }
136
137 let requests = Partitioner::new(self.partition_manager)
138 .partition_insert_requests(&table_info, Rows { schema, rows })
139 .await?;
140 let requests = RegionInsertRequests { requests };
141 if table_info.is_ttl_instant_table() {
142 Ok((
143 InstantAndNormalInsertRequests {
144 normal_requests: Default::default(),
145 instant_requests: requests,
146 },
147 table_info,
148 ))
149 } else {
150 Ok((
151 InstantAndNormalInsertRequests {
152 normal_requests: requests,
153 instant_requests: Default::default(),
154 },
155 table_info,
156 ))
157 }
158 }
159
160 async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
161 self.catalog_manager
162 .table(catalog, schema, table, None)
163 .await
164 .context(CatalogSnafu)?
165 .with_context(|| TableNotFoundSnafu {
166 table_name: common_catalog::format_full_table_name(catalog, schema, table),
167 })
168 }
169
170 fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
171 match &obj_name.0[..] {
172 [table] => Ok((
173 self.ctx.current_catalog().to_owned(),
174 self.ctx.current_schema(),
175 table.value.clone(),
176 )),
177 [schema, table] => Ok((
178 self.ctx.current_catalog().to_owned(),
179 schema.value.clone(),
180 table.value.clone(),
181 )),
182 [catalog, schema, table] => Ok((
183 catalog.value.clone(),
184 schema.value.clone(),
185 table.value.clone(),
186 )),
187 _ => InvalidSqlSnafu {
188 err_msg: format!(
189 "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
190 ),
191 }.fail(),
192 }
193 }
194}
195
196fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
197 if !stmt.columns().is_empty() {
198 stmt.columns()
199 } else {
200 table_schema
201 .column_schemas()
202 .iter()
203 .map(|column| &column.name)
204 .collect()
205 }
206}
207
208fn sql_value_to_grpc_value(
212 column_schema: &ColumnSchema,
213 sql_val: &SqlValue,
214 timezone: Option<&Timezone>,
215 auto_string_to_numeric: bool,
216) -> Result<GrpcValue> {
217 let column = &column_schema.name;
218 let value = if replace_default(sql_val) {
219 let default_value = column_schema
220 .create_default()
221 .context(ColumnDefaultValueSnafu {
222 column: column.clone(),
223 })?;
224
225 default_value.context(ColumnNoneDefaultValueSnafu {
226 column: column.clone(),
227 })?
228 } else {
229 common_sql::convert::sql_value_to_value(
230 column,
231 &column_schema.data_type,
232 sql_val,
233 timezone,
234 None,
235 auto_string_to_numeric,
236 )
237 .context(crate::error::SqlCommonSnafu)?
238 };
239
240 let grpc_value = value_to_grpc_value(value);
241 Ok(grpc_value)
242}
243
244fn replace_default(sql_val: &SqlValue) -> bool {
245 matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
246}