operator/req_convert/insert/
fill_impure_default.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Util functions to help with fill impure default values columns in request
16
17use std::sync::Arc;
18
19use ahash::{HashMap, HashMapExt, HashSet};
20use datatypes::schema::ColumnSchema;
21use snafu::{OptionExt, ResultExt};
22use store_api::storage::{RegionId, TableId};
23use table::metadata::{TableInfo, TableInfoRef};
24
25use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu};
26use crate::expr_helper::column_schemas_to_defs;
27use crate::insert::InstantAndNormalInsertRequests;
28
29/// Find all columns that have impure default values
30pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
31    let columns = table_info.meta.schema.column_schemas();
32    columns
33        .iter()
34        .filter(|column| column.is_default_impure())
35        .cloned()
36        .collect()
37}
38
39/// Fill impure default values in the request
40pub struct ImpureDefaultFiller {
41    impure_columns: HashMap<String, (api::v1::ColumnSchema, Option<api::v1::Value>)>,
42}
43
44impl ImpureDefaultFiller {
45    pub fn new(table_info: TableInfoRef) -> Result<Self> {
46        let impure_column_list = find_all_impure_columns(&table_info);
47        let pks = &table_info.meta.primary_key_indices;
48        let pk_names = pks
49            .iter()
50            .map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
51            .collect::<Vec<_>>();
52        let mut impure_columns = HashMap::new();
53        for column in impure_column_list {
54            let default_value = column
55                .create_impure_default()
56                .with_context(|_| ConvertColumnDefaultConstraintSnafu {
57                    column_name: column.name.clone(),
58                })?
59                .with_context(|| UnexpectedSnafu {
60                    violated: format!(
61                        "Expect default value to be impure, found {:?}",
62                        column.default_constraint()
63                    ),
64                })?;
65            let grpc_default_value = api::helper::to_proto_value(default_value);
66            let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
67            let grpc_column_schema = api::v1::ColumnSchema {
68                column_name: def.name,
69                datatype: def.data_type,
70                semantic_type: def.semantic_type,
71                datatype_extension: def.datatype_extension,
72                options: def.options,
73            };
74            impure_columns.insert(
75                grpc_column_schema.column_name.clone(),
76                (grpc_column_schema, grpc_default_value),
77            );
78        }
79        Ok(Self { impure_columns })
80    }
81
82    /// Fill impure default values in the request
83    pub fn fill_rows(&self, rows: &mut api::v1::Rows) {
84        let impure_columns_in_reqs: HashSet<_> = rows
85            .schema
86            .iter()
87            .filter_map(|schema| {
88                self.impure_columns
89                    .contains_key(&schema.column_name)
90                    .then_some(&schema.column_name)
91            })
92            .collect();
93
94        if self.impure_columns.len() == impure_columns_in_reqs.len() {
95            return;
96        }
97
98        let (schema_append, row_append): (Vec<_>, Vec<_>) = self
99            .impure_columns
100            .iter()
101            .filter_map(|(name, (schema, val))| {
102                if !impure_columns_in_reqs.contains(name) {
103                    Some((schema.clone(), val.clone().unwrap_or_default()))
104                } else {
105                    None
106                }
107            })
108            .unzip();
109
110        rows.schema.extend(schema_append);
111        for row in rows.rows.iter_mut() {
112            row.values.extend_from_slice(row_append.as_slice());
113        }
114    }
115}
116
117/// Fill impure default values in the request(only for normal insert requests, since instant insert can be filled in flownode directly as a single source of truth)
118pub fn fill_reqs_with_impure_default(
119    table_infos: &HashMap<TableId, Arc<TableInfo>>,
120    mut inserts: InstantAndNormalInsertRequests,
121) -> Result<InstantAndNormalInsertRequests> {
122    let fillers = table_infos
123        .iter()
124        .map(|(table_id, table_info)| {
125            let table_id = *table_id;
126            ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler))
127        })
128        .collect::<Result<HashMap<TableId, ImpureDefaultFiller>>>()?;
129
130    let normal_inserts = &mut inserts.normal_requests;
131    for request in normal_inserts.requests.iter_mut() {
132        let region_id = RegionId::from(request.region_id);
133        let table_id = region_id.table_id();
134        let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu {
135            violated: format!("impure default filler for table_id: {} not found", table_id),
136        })?;
137
138        if let Some(rows) = &mut request.rows {
139            filler.fill_rows(rows);
140        }
141    }
142    Ok(inserts)
143}
144
145#[cfg(test)]
146mod tests {
147    use api::v1::value::ValueData;
148    use datatypes::data_type::ConcreteDataType;
149    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
150    use datatypes::value::Value;
151    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
152
153    use super::*;
154
155    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills DEFAULT now(), col2 int32]`.
156    fn new_test_schema() -> Schema {
157        let column_schemas = vec![
158            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
159            ColumnSchema::new(
160                "ts",
161                ConcreteDataType::timestamp_millisecond_datatype(),
162                false,
163            )
164            .with_time_index(true)
165            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function(
166                "now()".to_string(),
167            )))
168            .unwrap(),
169            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true)
170                .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
171                    Value::from(1i32),
172                )))
173                .unwrap(),
174        ];
175        SchemaBuilder::try_from(column_schemas)
176            .unwrap()
177            .version(123)
178            .build()
179            .unwrap()
180    }
181
182    pub fn new_table_info() -> TableInfo {
183        let schema = Arc::new(new_test_schema());
184        let meta = TableMetaBuilder::empty()
185            .schema(schema)
186            .primary_key_indices(vec![0])
187            .engine("engine")
188            .next_column_id(3)
189            .build()
190            .unwrap();
191
192        TableInfoBuilder::default()
193            .table_id(10)
194            .table_version(5)
195            .name("mytable")
196            .meta(meta)
197            .build()
198            .unwrap()
199    }
200
201    fn column_schema_to_proto(
202        column_schema: &[ColumnSchema],
203        pk_names: &[String],
204    ) -> Vec<api::v1::ColumnSchema> {
205        column_schemas_to_defs(column_schema.to_vec(), pk_names)
206            .unwrap()
207            .into_iter()
208            .map(|def| api::v1::ColumnSchema {
209                column_name: def.name,
210                datatype: def.data_type,
211                semantic_type: def.semantic_type,
212                datatype_extension: def.datatype_extension,
213                options: def.options,
214            })
215            .collect()
216    }
217
218    #[test]
219    fn test_impure_append() {
220        let row = api::v1::Row {
221            values: vec![api::v1::Value {
222                value_data: Some(ValueData::I32Value(42)),
223            }],
224        };
225        let schema = new_test_schema().column_schemas()[0].clone();
226        let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]);
227
228        let mut rows = api::v1::Rows {
229            schema: col_schemas,
230            rows: vec![row],
231        };
232
233        let info = new_table_info();
234        let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap();
235        filler.fill_rows(&mut rows);
236
237        assert_eq!(rows.schema[1].column_name, "ts");
238        assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2);
239    }
240}