operator/req_convert/insert/
fill_impure_default.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Util functions to help with fill impure default values columns in request
16
17use std::sync::Arc;
18
19use ahash::{HashMap, HashMapExt, HashSet};
20use datatypes::schema::ColumnSchema;
21use snafu::{OptionExt, ResultExt};
22use store_api::storage::{RegionId, TableId};
23use table::metadata::{TableInfo, TableInfoRef};
24
25use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu};
26use crate::expr_helper::column_schemas_to_defs;
27use crate::insert::InstantAndNormalInsertRequests;
28
29/// Find all columns that have impure default values
30pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
31    let columns = table_info.meta.schema.column_schemas();
32    columns
33        .iter()
34        .filter(|column| column.is_default_impure())
35        .cloned()
36        .collect()
37}
38
39// TODO(yingwen): Support Bulk insert request.
40/// Fill impure default values in the request
41pub struct ImpureDefaultFiller {
42    impure_columns: HashMap<String, (api::v1::ColumnSchema, api::v1::Value)>,
43}
44
45impl ImpureDefaultFiller {
46    pub fn new(table_info: TableInfoRef) -> Result<Self> {
47        let impure_column_list = find_all_impure_columns(&table_info);
48        let pks = &table_info.meta.primary_key_indices;
49        let pk_names = pks
50            .iter()
51            .map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
52            .collect::<Vec<_>>();
53        let mut impure_columns = HashMap::new();
54        for column in impure_column_list {
55            let default_value = column
56                .create_impure_default()
57                .with_context(|_| ConvertColumnDefaultConstraintSnafu {
58                    column_name: column.name.clone(),
59                })?
60                .with_context(|| UnexpectedSnafu {
61                    violated: format!(
62                        "Expect default value to be impure, found {:?}",
63                        column.default_constraint()
64                    ),
65                })?;
66            let grpc_default_value = api::helper::to_grpc_value(default_value);
67            let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
68            let grpc_column_schema = api::v1::ColumnSchema {
69                column_name: def.name,
70                datatype: def.data_type,
71                semantic_type: def.semantic_type,
72                datatype_extension: def.datatype_extension,
73                options: def.options,
74            };
75            impure_columns.insert(
76                grpc_column_schema.column_name.clone(),
77                (grpc_column_schema, grpc_default_value),
78            );
79        }
80        Ok(Self { impure_columns })
81    }
82
83    /// Fill impure default values in the request
84    pub fn fill_rows(&self, rows: &mut api::v1::Rows) {
85        let impure_columns_in_reqs: HashSet<_> = rows
86            .schema
87            .iter()
88            .filter_map(|schema| {
89                self.impure_columns
90                    .contains_key(&schema.column_name)
91                    .then_some(&schema.column_name)
92            })
93            .collect();
94
95        if self.impure_columns.len() == impure_columns_in_reqs.len() {
96            return;
97        }
98
99        let (schema_append, row_append): (Vec<_>, Vec<_>) = self
100            .impure_columns
101            .iter()
102            .filter_map(|(name, (schema, val))| {
103                if !impure_columns_in_reqs.contains(name) {
104                    Some((schema.clone(), val.clone()))
105                } else {
106                    None
107                }
108            })
109            .unzip();
110
111        rows.schema.extend(schema_append);
112        for row in rows.rows.iter_mut() {
113            row.values.extend_from_slice(row_append.as_slice());
114        }
115    }
116}
117
118/// Fill impure default values in the request(only for normal insert requests, since instant insert can be filled in flownode directly as a single source of truth)
119pub fn fill_reqs_with_impure_default(
120    table_infos: &HashMap<TableId, Arc<TableInfo>>,
121    mut inserts: InstantAndNormalInsertRequests,
122) -> Result<InstantAndNormalInsertRequests> {
123    let fillers = table_infos
124        .iter()
125        .map(|(table_id, table_info)| {
126            let table_id = *table_id;
127            ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler))
128        })
129        .collect::<Result<HashMap<TableId, ImpureDefaultFiller>>>()?;
130
131    let normal_inserts = &mut inserts.normal_requests;
132    for request in normal_inserts.requests.iter_mut() {
133        let region_id = RegionId::from(request.region_id);
134        let table_id = region_id.table_id();
135        let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu {
136            violated: format!("impure default filler for table_id: {} not found", table_id),
137        })?;
138
139        if let Some(rows) = &mut request.rows {
140            filler.fill_rows(rows);
141        }
142    }
143    Ok(inserts)
144}
145
146#[cfg(test)]
147mod tests {
148    use api::v1::value::ValueData;
149    use datatypes::data_type::ConcreteDataType;
150    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
151    use datatypes::value::Value;
152    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
153
154    use super::*;
155
156    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills DEFAULT now(), col2 int32]`.
157    fn new_test_schema() -> Schema {
158        let column_schemas = vec![
159            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
160            ColumnSchema::new(
161                "ts",
162                ConcreteDataType::timestamp_millisecond_datatype(),
163                false,
164            )
165            .with_time_index(true)
166            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function(
167                "now()".to_string(),
168            )))
169            .unwrap(),
170            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true)
171                .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
172                    Value::from(1i32),
173                )))
174                .unwrap(),
175        ];
176        SchemaBuilder::try_from(column_schemas)
177            .unwrap()
178            .version(123)
179            .build()
180            .unwrap()
181    }
182
183    pub fn new_table_info() -> TableInfo {
184        let schema = Arc::new(new_test_schema());
185        let meta = TableMetaBuilder::empty()
186            .schema(schema)
187            .primary_key_indices(vec![0])
188            .engine("engine")
189            .next_column_id(3)
190            .build()
191            .unwrap();
192
193        TableInfoBuilder::default()
194            .table_id(10)
195            .table_version(5)
196            .name("mytable")
197            .meta(meta)
198            .build()
199            .unwrap()
200    }
201
202    fn column_schema_to_proto(
203        column_schema: &[ColumnSchema],
204        pk_names: &[String],
205    ) -> Vec<api::v1::ColumnSchema> {
206        column_schemas_to_defs(column_schema.to_vec(), pk_names)
207            .unwrap()
208            .into_iter()
209            .map(|def| api::v1::ColumnSchema {
210                column_name: def.name,
211                datatype: def.data_type,
212                semantic_type: def.semantic_type,
213                datatype_extension: def.datatype_extension,
214                options: def.options,
215            })
216            .collect()
217    }
218
219    #[test]
220    fn test_impure_append() {
221        let row = api::v1::Row {
222            values: vec![api::v1::Value {
223                value_data: Some(ValueData::I32Value(42)),
224            }],
225        };
226        let schema = new_test_schema().column_schemas()[0].clone();
227        let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]);
228
229        let mut rows = api::v1::Rows {
230            schema: col_schemas,
231            rows: vec![row],
232        };
233
234        let info = new_table_info();
235        let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap();
236        filler.fill_rows(&mut rows);
237
238        assert_eq!(rows.schema[1].column_name, "ts");
239        assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2);
240    }
241}