operator/req_convert/insert/
fill_impure_default.rs1use std::sync::Arc;
18
19use ahash::{HashMap, HashMapExt, HashSet};
20use datatypes::schema::ColumnSchema;
21use snafu::{OptionExt, ResultExt};
22use store_api::storage::{RegionId, TableId};
23use table::metadata::{TableInfo, TableInfoRef};
24
25use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu};
26use crate::expr_helper::column_schemas_to_defs;
27use crate::insert::InstantAndNormalInsertRequests;
28
29pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
31 let columns = table_info.meta.schema.column_schemas();
32 columns
33 .iter()
34 .filter(|column| column.is_default_impure())
35 .cloned()
36 .collect()
37}
38
39pub struct ImpureDefaultFiller {
42 impure_columns: HashMap<String, (api::v1::ColumnSchema, api::v1::Value)>,
43}
44
45impl ImpureDefaultFiller {
46 pub fn new(table_info: TableInfoRef) -> Result<Self> {
47 let impure_column_list = find_all_impure_columns(&table_info);
48 let pks = &table_info.meta.primary_key_indices;
49 let pk_names = pks
50 .iter()
51 .map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
52 .collect::<Vec<_>>();
53 let mut impure_columns = HashMap::new();
54 for column in impure_column_list {
55 let default_value = column
56 .create_impure_default()
57 .with_context(|_| ConvertColumnDefaultConstraintSnafu {
58 column_name: column.name.clone(),
59 })?
60 .with_context(|| UnexpectedSnafu {
61 violated: format!(
62 "Expect default value to be impure, found {:?}",
63 column.default_constraint()
64 ),
65 })?;
66 let grpc_default_value = api::helper::to_grpc_value(default_value);
67 let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
68 let grpc_column_schema = api::v1::ColumnSchema {
69 column_name: def.name,
70 datatype: def.data_type,
71 semantic_type: def.semantic_type,
72 datatype_extension: def.datatype_extension,
73 options: def.options,
74 };
75 impure_columns.insert(
76 grpc_column_schema.column_name.clone(),
77 (grpc_column_schema, grpc_default_value),
78 );
79 }
80 Ok(Self { impure_columns })
81 }
82
83 pub fn fill_rows(&self, rows: &mut api::v1::Rows) {
85 let impure_columns_in_reqs: HashSet<_> = rows
86 .schema
87 .iter()
88 .filter_map(|schema| {
89 self.impure_columns
90 .contains_key(&schema.column_name)
91 .then_some(&schema.column_name)
92 })
93 .collect();
94
95 if self.impure_columns.len() == impure_columns_in_reqs.len() {
96 return;
97 }
98
99 let (schema_append, row_append): (Vec<_>, Vec<_>) = self
100 .impure_columns
101 .iter()
102 .filter_map(|(name, (schema, val))| {
103 if !impure_columns_in_reqs.contains(name) {
104 Some((schema.clone(), val.clone()))
105 } else {
106 None
107 }
108 })
109 .unzip();
110
111 rows.schema.extend(schema_append);
112 for row in rows.rows.iter_mut() {
113 row.values.extend_from_slice(row_append.as_slice());
114 }
115 }
116}
117
118pub fn fill_reqs_with_impure_default(
120 table_infos: &HashMap<TableId, Arc<TableInfo>>,
121 mut inserts: InstantAndNormalInsertRequests,
122) -> Result<InstantAndNormalInsertRequests> {
123 let fillers = table_infos
124 .iter()
125 .map(|(table_id, table_info)| {
126 let table_id = *table_id;
127 ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler))
128 })
129 .collect::<Result<HashMap<TableId, ImpureDefaultFiller>>>()?;
130
131 let normal_inserts = &mut inserts.normal_requests;
132 for request in normal_inserts.requests.iter_mut() {
133 let region_id = RegionId::from(request.region_id);
134 let table_id = region_id.table_id();
135 let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu {
136 violated: format!("impure default filler for table_id: {} not found", table_id),
137 })?;
138
139 if let Some(rows) = &mut request.rows {
140 filler.fill_rows(rows);
141 }
142 }
143 Ok(inserts)
144}
145
146#[cfg(test)]
147mod tests {
148 use api::v1::value::ValueData;
149 use datatypes::data_type::ConcreteDataType;
150 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
151 use datatypes::value::Value;
152 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
153
154 use super::*;
155
156 fn new_test_schema() -> Schema {
158 let column_schemas = vec![
159 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
160 ColumnSchema::new(
161 "ts",
162 ConcreteDataType::timestamp_millisecond_datatype(),
163 false,
164 )
165 .with_time_index(true)
166 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function(
167 "now()".to_string(),
168 )))
169 .unwrap(),
170 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true)
171 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
172 Value::from(1i32),
173 )))
174 .unwrap(),
175 ];
176 SchemaBuilder::try_from(column_schemas)
177 .unwrap()
178 .version(123)
179 .build()
180 .unwrap()
181 }
182
183 pub fn new_table_info() -> TableInfo {
184 let schema = Arc::new(new_test_schema());
185 let meta = TableMetaBuilder::empty()
186 .schema(schema)
187 .primary_key_indices(vec![0])
188 .engine("engine")
189 .next_column_id(3)
190 .build()
191 .unwrap();
192
193 TableInfoBuilder::default()
194 .table_id(10)
195 .table_version(5)
196 .name("mytable")
197 .meta(meta)
198 .build()
199 .unwrap()
200 }
201
202 fn column_schema_to_proto(
203 column_schema: &[ColumnSchema],
204 pk_names: &[String],
205 ) -> Vec<api::v1::ColumnSchema> {
206 column_schemas_to_defs(column_schema.to_vec(), pk_names)
207 .unwrap()
208 .into_iter()
209 .map(|def| api::v1::ColumnSchema {
210 column_name: def.name,
211 datatype: def.data_type,
212 semantic_type: def.semantic_type,
213 datatype_extension: def.datatype_extension,
214 options: def.options,
215 })
216 .collect()
217 }
218
219 #[test]
220 fn test_impure_append() {
221 let row = api::v1::Row {
222 values: vec![api::v1::Value {
223 value_data: Some(ValueData::I32Value(42)),
224 }],
225 };
226 let schema = new_test_schema().column_schemas()[0].clone();
227 let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]);
228
229 let mut rows = api::v1::Rows {
230 schema: col_schemas,
231 rows: vec![row],
232 };
233
234 let info = new_table_info();
235 let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap();
236 filler.fill_rows(&mut rows);
237
238 assert_eq!(rows.schema[1].column_name, "ts");
239 assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2);
240 }
241}