operator/req_convert/insert/
fill_impure_default.rs1use std::sync::Arc;
18
19use ahash::{HashMap, HashMapExt, HashSet};
20use datatypes::schema::ColumnSchema;
21use snafu::{OptionExt, ResultExt};
22use store_api::storage::{RegionId, TableId};
23use table::metadata::{TableInfo, TableInfoRef};
24
25use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu};
26use crate::expr_helper::column_schemas_to_defs;
27use crate::insert::InstantAndNormalInsertRequests;
28
29pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
31 let columns = table_info.meta.schema.column_schemas();
32 columns
33 .iter()
34 .filter(|column| column.is_default_impure())
35 .cloned()
36 .collect()
37}
38
39pub struct ImpureDefaultFiller {
41 impure_columns: HashMap<String, (api::v1::ColumnSchema, Option<api::v1::Value>)>,
42}
43
44impl ImpureDefaultFiller {
45 pub fn new(table_info: TableInfoRef) -> Result<Self> {
46 let impure_column_list = find_all_impure_columns(&table_info);
47 let pks = &table_info.meta.primary_key_indices;
48 let pk_names = pks
49 .iter()
50 .map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
51 .collect::<Vec<_>>();
52 let mut impure_columns = HashMap::new();
53 for column in impure_column_list {
54 let default_value = column
55 .create_impure_default()
56 .with_context(|_| ConvertColumnDefaultConstraintSnafu {
57 column_name: column.name.clone(),
58 })?
59 .with_context(|| UnexpectedSnafu {
60 violated: format!(
61 "Expect default value to be impure, found {:?}",
62 column.default_constraint()
63 ),
64 })?;
65 let grpc_default_value = api::helper::to_proto_value(default_value);
66 let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
67 let grpc_column_schema = api::v1::ColumnSchema {
68 column_name: def.name,
69 datatype: def.data_type,
70 semantic_type: def.semantic_type,
71 datatype_extension: def.datatype_extension,
72 options: def.options,
73 };
74 impure_columns.insert(
75 grpc_column_schema.column_name.clone(),
76 (grpc_column_schema, grpc_default_value),
77 );
78 }
79 Ok(Self { impure_columns })
80 }
81
82 pub fn fill_rows(&self, rows: &mut api::v1::Rows) {
84 let impure_columns_in_reqs: HashSet<_> = rows
85 .schema
86 .iter()
87 .filter_map(|schema| {
88 self.impure_columns
89 .contains_key(&schema.column_name)
90 .then_some(&schema.column_name)
91 })
92 .collect();
93
94 if self.impure_columns.len() == impure_columns_in_reqs.len() {
95 return;
96 }
97
98 let (schema_append, row_append): (Vec<_>, Vec<_>) = self
99 .impure_columns
100 .iter()
101 .filter_map(|(name, (schema, val))| {
102 if !impure_columns_in_reqs.contains(name) {
103 Some((schema.clone(), val.clone().unwrap_or_default()))
104 } else {
105 None
106 }
107 })
108 .unzip();
109
110 rows.schema.extend(schema_append);
111 for row in rows.rows.iter_mut() {
112 row.values.extend_from_slice(row_append.as_slice());
113 }
114 }
115}
116
117pub fn fill_reqs_with_impure_default(
119 table_infos: &HashMap<TableId, Arc<TableInfo>>,
120 mut inserts: InstantAndNormalInsertRequests,
121) -> Result<InstantAndNormalInsertRequests> {
122 let fillers = table_infos
123 .iter()
124 .map(|(table_id, table_info)| {
125 let table_id = *table_id;
126 ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler))
127 })
128 .collect::<Result<HashMap<TableId, ImpureDefaultFiller>>>()?;
129
130 let normal_inserts = &mut inserts.normal_requests;
131 for request in normal_inserts.requests.iter_mut() {
132 let region_id = RegionId::from(request.region_id);
133 let table_id = region_id.table_id();
134 let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu {
135 violated: format!("impure default filler for table_id: {} not found", table_id),
136 })?;
137
138 if let Some(rows) = &mut request.rows {
139 filler.fill_rows(rows);
140 }
141 }
142 Ok(inserts)
143}
144
145#[cfg(test)]
146mod tests {
147 use api::v1::value::ValueData;
148 use datatypes::data_type::ConcreteDataType;
149 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
150 use datatypes::value::Value;
151 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
152
153 use super::*;
154
155 fn new_test_schema() -> Schema {
157 let column_schemas = vec![
158 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
159 ColumnSchema::new(
160 "ts",
161 ConcreteDataType::timestamp_millisecond_datatype(),
162 false,
163 )
164 .with_time_index(true)
165 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function(
166 "now()".to_string(),
167 )))
168 .unwrap(),
169 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true)
170 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
171 Value::from(1i32),
172 )))
173 .unwrap(),
174 ];
175 SchemaBuilder::try_from(column_schemas)
176 .unwrap()
177 .version(123)
178 .build()
179 .unwrap()
180 }
181
182 pub fn new_table_info() -> TableInfo {
183 let schema = Arc::new(new_test_schema());
184 let meta = TableMetaBuilder::empty()
185 .schema(schema)
186 .primary_key_indices(vec![0])
187 .engine("engine")
188 .next_column_id(3)
189 .build()
190 .unwrap();
191
192 TableInfoBuilder::default()
193 .table_id(10)
194 .table_version(5)
195 .name("mytable")
196 .meta(meta)
197 .build()
198 .unwrap()
199 }
200
201 fn column_schema_to_proto(
202 column_schema: &[ColumnSchema],
203 pk_names: &[String],
204 ) -> Vec<api::v1::ColumnSchema> {
205 column_schemas_to_defs(column_schema.to_vec(), pk_names)
206 .unwrap()
207 .into_iter()
208 .map(|def| api::v1::ColumnSchema {
209 column_name: def.name,
210 datatype: def.data_type,
211 semantic_type: def.semantic_type,
212 datatype_extension: def.datatype_extension,
213 options: def.options,
214 })
215 .collect()
216 }
217
218 #[test]
219 fn test_impure_append() {
220 let row = api::v1::Row {
221 values: vec![api::v1::Value {
222 value_data: Some(ValueData::I32Value(42)),
223 }],
224 };
225 let schema = new_test_schema().column_schemas()[0].clone();
226 let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]);
227
228 let mut rows = api::v1::Rows {
229 schema: col_schemas,
230 rows: vec![row],
231 };
232
233 let info = new_table_info();
234 let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap();
235 filler.fill_rows(&mut rows);
236
237 assert_eq!(rows.schema[1].column_name, "ts");
238 assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2);
239 }
240}