operator/req_convert/insert/
stmt_to_region.rs1use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
16use api::v1::column_def::options_from_column_schema;
17use api::v1::region::InsertRequests as RegionInsertRequests;
18use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
19use catalog::CatalogManager;
20use common_time::Timezone;
21use datatypes::schema::{ColumnSchema, SchemaRef};
22use partition::manager::PartitionRuleManager;
23use session::context::{QueryContext, QueryContextRef};
24use snafu::{ensure, OptionExt, ResultExt};
25use sql::statements;
26use sql::statements::insert::Insert;
27use sqlparser::ast::{ObjectName, Value as SqlValue};
28use table::metadata::TableInfoRef;
29use table::TableRef;
30
31use crate::error::{
32 CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
33 ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
34 SchemaReadOnlySnafu, TableNotFoundSnafu,
35};
36use crate::insert::InstantAndNormalInsertRequests;
37use crate::req_convert::common::partitioner::Partitioner;
38use crate::req_convert::insert::semantic_type;
39
40const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
41
42pub struct StatementToRegion<'a> {
43 catalog_manager: &'a dyn CatalogManager,
44 partition_manager: &'a PartitionRuleManager,
45 ctx: &'a QueryContext,
46}
47
48impl<'a> StatementToRegion<'a> {
49 pub fn new(
50 catalog_manager: &'a dyn CatalogManager,
51 partition_manager: &'a PartitionRuleManager,
52 ctx: &'a QueryContext,
53 ) -> Self {
54 Self {
55 catalog_manager,
56 partition_manager,
57 ctx,
58 }
59 }
60
61 pub async fn convert(
62 &self,
63 stmt: &Insert,
64 query_ctx: &QueryContextRef,
65 ) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
66 let name = stmt.table_name().context(ParseSqlSnafu)?;
67 let (catalog, schema, table_name) = self.get_full_name(name)?;
68 let table = self.get_table(&catalog, &schema, &table_name).await?;
69 let table_schema = table.schema();
70 let table_info = table.table_info();
71
72 ensure!(
73 !common_catalog::consts::is_readonly_schema(&schema),
74 SchemaReadOnlySnafu { name: schema }
75 );
76
77 let column_names = column_names(stmt, &table_schema);
78 let column_count = column_names.len();
79
80 let sql_rows = stmt.values_body().context(MissingInsertBodySnafu)?;
81 let row_count = sql_rows.len();
82
83 sql_rows.iter().try_for_each(|r| {
84 ensure!(
85 r.len() == column_count,
86 InvalidSqlSnafu {
87 err_msg: format!(
88 "column count mismatch, columns: {}, values: {}",
89 column_count,
90 r.len()
91 )
92 }
93 );
94 Ok(())
95 })?;
96
97 let mut schema = Vec::with_capacity(column_count);
98 let mut rows = vec![
99 Row {
100 values: Vec::with_capacity(column_count)
101 };
102 row_count
103 ];
104
105 for (i, column_name) in column_names.into_iter().enumerate() {
106 let column_schema = table_schema
107 .column_schema_by_name(column_name)
108 .with_context(|| ColumnNotFoundSnafu {
109 msg: format!("Column {} not found in table {}", column_name, &table_name),
110 })?;
111
112 let (datatype, datatype_extension) =
113 ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
114 .context(ColumnDataTypeSnafu)?
115 .to_parts();
116 let semantic_type = semantic_type(&table_info, column_name)?;
117
118 let grpc_column_schema = GrpcColumnSchema {
119 column_name: column_name.clone(),
120 datatype: datatype.into(),
121 semantic_type: semantic_type.into(),
122 datatype_extension,
123 options: options_from_column_schema(column_schema),
124 };
125 schema.push(grpc_column_schema);
126
127 for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
128 let value = sql_value_to_grpc_value(
129 column_schema,
130 &sql_row[i],
131 Some(&query_ctx.timezone()),
132 query_ctx.auto_string_to_numeric(),
133 )?;
134 grpc_row.values.push(value);
135 }
136 }
137
138 let requests = Partitioner::new(self.partition_manager)
139 .partition_insert_requests(table_info.table_id(), Rows { schema, rows })
140 .await?;
141 let requests = RegionInsertRequests { requests };
142 if table_info.is_ttl_instant_table() {
143 Ok((
144 InstantAndNormalInsertRequests {
145 normal_requests: Default::default(),
146 instant_requests: requests,
147 },
148 table_info,
149 ))
150 } else {
151 Ok((
152 InstantAndNormalInsertRequests {
153 normal_requests: requests,
154 instant_requests: Default::default(),
155 },
156 table_info,
157 ))
158 }
159 }
160
161 async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
162 self.catalog_manager
163 .table(catalog, schema, table, None)
164 .await
165 .context(CatalogSnafu)?
166 .with_context(|| TableNotFoundSnafu {
167 table_name: common_catalog::format_full_table_name(catalog, schema, table),
168 })
169 }
170
171 fn get_full_name(&self, obj_name: &ObjectName) -> Result<(String, String, String)> {
172 match &obj_name.0[..] {
173 [table] => Ok((
174 self.ctx.current_catalog().to_owned(),
175 self.ctx.current_schema(),
176 table.value.clone(),
177 )),
178 [schema, table] => Ok((
179 self.ctx.current_catalog().to_owned(),
180 schema.value.clone(),
181 table.value.clone(),
182 )),
183 [catalog, schema, table] => Ok((
184 catalog.value.clone(),
185 schema.value.clone(),
186 table.value.clone(),
187 )),
188 _ => InvalidSqlSnafu {
189 err_msg: format!(
190 "expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
191 ),
192 }.fail(),
193 }
194 }
195}
196
197fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
198 if !stmt.columns().is_empty() {
199 stmt.columns()
200 } else {
201 table_schema
202 .column_schemas()
203 .iter()
204 .map(|column| &column.name)
205 .collect()
206 }
207}
208
209fn sql_value_to_grpc_value(
213 column_schema: &ColumnSchema,
214 sql_val: &SqlValue,
215 timezone: Option<&Timezone>,
216 auto_string_to_numeric: bool,
217) -> Result<GrpcValue> {
218 let column = &column_schema.name;
219 let value = if replace_default(sql_val) {
220 let default_value = column_schema
221 .create_default()
222 .context(ColumnDefaultValueSnafu {
223 column: column.clone(),
224 })?;
225
226 default_value.context(ColumnNoneDefaultValueSnafu {
227 column: column.clone(),
228 })?
229 } else {
230 statements::sql_value_to_value(
231 column,
232 &column_schema.data_type,
233 sql_val,
234 timezone,
235 None,
236 auto_string_to_numeric,
237 )
238 .context(ParseSqlSnafu)?
239 };
240
241 let grpc_value = value_to_grpc_value(value);
242 Ok(grpc_value)
243}
244
245fn replace_default(sql_val: &SqlValue) -> bool {
246 matches!(sql_val, SqlValue::Placeholder(s) if s.to_lowercase() == DEFAULT_PLACEHOLDER_VALUE)
247}