operator/req_convert/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod partitioner;
16
17use std::collections::HashMap;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::column_data_type_extension::TypeExt;
21use api::v1::column_def::options_from_column_schema;
22use api::v1::value::ValueData;
23use api::v1::{
24    Column, ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25    RowDeleteRequest, RowInsertRequest, Rows, SemanticType, Value,
26};
27use common_base::BitVec;
28use datatypes::prelude::ConcreteDataType;
29use datatypes::vectors::VectorRef;
30use snafu::ResultExt;
31use snafu::prelude::*;
32use table::metadata::TableInfo;
33
34use crate::error::{
35    ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidJsonFormatSnafu,
36    MissingTimeIndexColumnSnafu, Result, UnexpectedSnafu,
37};
38
39/// Encodes a string value as JSONB binary data if the value is of `StringValue` type.
40fn encode_string_to_jsonb_binary(value_data: ValueData) -> Result<ValueData> {
41    if let ValueData::StringValue(json) = &value_data {
42        let binary = jsonb::parse_value(json.as_bytes())
43            .map_err(|_| InvalidJsonFormatSnafu { json }.build())
44            .map(|jsonb| jsonb.to_vec())?;
45        Ok(ValueData::BinaryValue(binary))
46    } else {
47        UnexpectedSnafu {
48            violated: "Expected to value data to be a string.",
49        }
50        .fail()
51    }
52}
53
54/// Prepares row insertion requests by converting any JSON values to binary JSONB format.
55pub fn preprocess_row_insert_requests(requests: &mut Vec<RowInsertRequest>) -> Result<()> {
56    for request in requests {
57        validate_rows(&request.rows)?;
58        prepare_rows(&mut request.rows)?;
59    }
60
61    Ok(())
62}
63
64/// Prepares row deletion requests by converting any JSON values to binary JSONB format.
65pub fn preprocess_row_delete_requests(requests: &mut Vec<RowDeleteRequest>) -> Result<()> {
66    for request in requests {
67        validate_rows(&request.rows)?;
68        prepare_rows(&mut request.rows)?;
69    }
70
71    Ok(())
72}
73
74fn prepare_rows(rows: &mut Option<Rows>) -> Result<()> {
75    if let Some(rows) = rows {
76        let indexes = rows
77            .schema
78            .iter()
79            .enumerate()
80            .filter_map(|(idx, schema)| {
81                if schema.datatype() == ColumnDataType::Json {
82                    Some(idx)
83                } else {
84                    None
85                }
86            })
87            .collect::<Vec<_>>();
88        for idx in &indexes {
89            let column = &mut rows.schema[*idx];
90            column.datatype_extension = Some(ColumnDataTypeExtension {
91                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
92            });
93            column.datatype = ColumnDataType::Json.into();
94        }
95
96        for idx in &indexes {
97            for row in &mut rows.rows {
98                if let Some(value_data) = row.values[*idx].value_data.take() {
99                    row.values[*idx].value_data = Some(encode_string_to_jsonb_binary(value_data)?);
100                }
101            }
102        }
103    }
104
105    Ok(())
106}
107
108fn validate_rows(rows: &Option<Rows>) -> Result<()> {
109    let Some(rows) = rows else {
110        return Ok(());
111    };
112
113    for (col_idx, schema) in rows.schema.iter().enumerate() {
114        let column_type =
115            ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension.clone())
116                .context(ColumnDataTypeSnafu)?
117                .into();
118
119        let ConcreteDataType::Vector(d) = column_type else {
120            return Ok(());
121        };
122
123        for row in &rows.rows {
124            let value = &row.values[col_idx].value_data;
125            if let Some(data) = value {
126                validate_vector_col(data, d.dim)?;
127            }
128        }
129    }
130
131    Ok(())
132}
133
134fn validate_vector_col(data: &ValueData, dim: u32) -> Result<()> {
135    let data = match data {
136        ValueData::BinaryValue(data) => data,
137        _ => {
138            return InvalidInsertRequestSnafu {
139                reason: "Expecting binary data for vector column.".to_string(),
140            }
141            .fail();
142        }
143    };
144
145    let expected_len = dim as usize * std::mem::size_of::<f32>();
146    if data.len() != expected_len {
147        return InvalidInsertRequestSnafu {
148            reason: format!(
149                "Expecting {} bytes of data for vector column, but got {}.",
150                expected_len,
151                data.len()
152            ),
153        }
154        .fail();
155    }
156
157    Ok(())
158}
159
160pub fn columns_to_rows(columns: Vec<Column>, row_count: u32) -> Result<Rows> {
161    let row_count = row_count as usize;
162    let column_count = columns.len();
163    let mut schema = Vec::with_capacity(column_count);
164    let mut rows = vec![
165        Row {
166            values: Vec::with_capacity(column_count)
167        };
168        row_count
169    ];
170    for column in columns {
171        let column_schema = ColumnSchema {
172            column_name: column.column_name.clone(),
173            datatype: column.datatype,
174            semantic_type: column.semantic_type,
175            datatype_extension: column.datatype_extension.clone(),
176            options: column.options.clone(),
177        };
178        schema.push(column_schema);
179
180        push_column_to_rows(column, &mut rows)?;
181    }
182
183    Ok(Rows { schema, rows })
184}
185
186fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
187    let null_mask = BitVec::from_vec(column.null_mask);
188    let column_type = ColumnDataTypeWrapper::try_new(column.datatype, column.datatype_extension)
189        .context(ColumnDataTypeSnafu)?
190        .datatype();
191    let column_values = column.values.unwrap_or_default();
192
193    macro_rules! push_column_values_match_types {
194        ($( ($arm:tt, $value_data_variant:tt, $field_name:tt), )*) => { match column_type { $(
195
196        ColumnDataType::$arm => {
197            let row_count = rows.len();
198            let actual_row_count = null_mask.count_ones() + column_values.$field_name.len();
199            ensure!(
200                actual_row_count == row_count,
201                InvalidInsertRequestSnafu {
202                    reason: format!(
203                        "Expecting {} rows of data for column '{}', but got {}.",
204                        row_count, column.column_name, actual_row_count
205                    ),
206                }
207            );
208
209            let mut null_mask_iter = null_mask.into_iter();
210            let mut values_iter = column_values.$field_name.into_iter();
211
212            for row in rows {
213                let value_is_null = null_mask_iter.next();
214                if value_is_null == Some(true) {
215                    row.values.push(Value { value_data: None });
216                } else {
217                    // previous check ensures that there is a value for each row
218                    let value = values_iter.next().unwrap();
219                    row.values.push(Value {
220                        value_data: Some(ValueData::$value_data_variant(value)),
221                    });
222                }
223            }
224        }
225
226        )* _ => {
227            return InvalidInsertRequestSnafu {
228                reason: format!(
229                    "Column '{}' with type {:?} is not supported in row inserts.",
230                    column.column_name, column_type
231                ),
232            }
233            .fail();
234        } }}
235    }
236
237    push_column_values_match_types!(
238        (Boolean, BoolValue, bool_values),
239        (Int8, I8Value, i8_values),
240        (Int16, I16Value, i16_values),
241        (Int32, I32Value, i32_values),
242        (Int64, I64Value, i64_values),
243        (Uint8, U8Value, u8_values),
244        (Uint16, U16Value, u16_values),
245        (Uint32, U32Value, u32_values),
246        (Uint64, U64Value, u64_values),
247        (Float32, F32Value, f32_values),
248        (Float64, F64Value, f64_values),
249        (Binary, BinaryValue, binary_values),
250        (String, StringValue, string_values),
251        (Json, StringValue, string_values),
252        (Date, DateValue, date_values),
253        (Datetime, DatetimeValue, datetime_values),
254        (
255            TimestampSecond,
256            TimestampSecondValue,
257            timestamp_second_values
258        ),
259        (
260            TimestampMillisecond,
261            TimestampMillisecondValue,
262            timestamp_millisecond_values
263        ),
264        (
265            TimestampMicrosecond,
266            TimestampMicrosecondValue,
267            timestamp_microsecond_values
268        ),
269        (
270            TimestampNanosecond,
271            TimestampNanosecondValue,
272            timestamp_nanosecond_values
273        ),
274        (TimeSecond, TimeSecondValue, time_second_values),
275        (
276            TimeMillisecond,
277            TimeMillisecondValue,
278            time_millisecond_values
279        ),
280        (
281            TimeMicrosecond,
282            TimeMicrosecondValue,
283            time_microsecond_values
284        ),
285        (TimeNanosecond, TimeNanosecondValue, time_nanosecond_values),
286        (
287            IntervalYearMonth,
288            IntervalYearMonthValue,
289            interval_year_month_values
290        ),
291        (
292            IntervalDayTime,
293            IntervalDayTimeValue,
294            interval_day_time_values
295        ),
296        (
297            IntervalMonthDayNano,
298            IntervalMonthDayNanoValue,
299            interval_month_day_nano_values
300        ),
301        (Decimal128, Decimal128Value, decimal128_values),
302        (Vector, BinaryValue, binary_values),
303        (List, ListValue, list_values),
304        (Struct, StructValue, struct_values),
305    );
306
307    Ok(())
308}
309
310pub fn row_count(columns: &HashMap<String, VectorRef>) -> Result<usize> {
311    let mut columns_iter = columns.values();
312
313    let len = columns_iter
314        .next()
315        .map(|column| column.len())
316        .unwrap_or_default();
317    ensure!(
318        columns_iter.all(|column| column.len() == len),
319        InvalidInsertRequestSnafu {
320            reason: "The row count of columns is not the same."
321        }
322    );
323
324    Ok(len)
325}
326
327pub fn column_schema(
328    table_info: &TableInfo,
329    columns: &HashMap<String, VectorRef>,
330) -> Result<Vec<ColumnSchema>> {
331    columns
332        .keys()
333        .map(|column_name| {
334            let column_schema = table_info
335                .meta
336                .schema
337                .column_schema_by_name(column_name)
338                .context(ColumnNotFoundSnafu {
339                    msg: format!("unable to find column {column_name} in table schema"),
340                })?;
341
342            let (datatype, datatype_extension) =
343                ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
344                    .context(ColumnDataTypeSnafu)?
345                    .to_parts();
346
347            Ok(ColumnSchema {
348                column_name: column_name.clone(),
349                datatype: datatype as i32,
350                semantic_type: semantic_type(table_info, column_name)?.into(),
351                datatype_extension,
352                options: options_from_column_schema(column_schema),
353            })
354        })
355        .collect::<Result<Vec<_>>>()
356}
357
358fn semantic_type(table_info: &TableInfo, column: &str) -> Result<SemanticType> {
359    let table_meta = &table_info.meta;
360    let table_schema = &table_meta.schema;
361
362    let time_index_column = &table_schema
363        .timestamp_column()
364        .with_context(|| table::error::MissingTimeIndexColumnSnafu {
365            table_name: table_info.name.clone(),
366        })
367        .context(MissingTimeIndexColumnSnafu)?
368        .name;
369
370    let semantic_type = if column == time_index_column {
371        SemanticType::Timestamp
372    } else {
373        let column_index = table_schema.column_index_by_name(column);
374        let column_index = column_index.context(ColumnNotFoundSnafu {
375            msg: format!("unable to find column {column} in table schema"),
376        })?;
377
378        if table_meta.primary_key_indices.contains(&column_index) {
379            SemanticType::Tag
380        } else {
381            SemanticType::Field
382        }
383    };
384
385    Ok(semantic_type)
386}
387
388#[cfg(test)]
389mod tests {
390    use api::v1::column::Values;
391    use api::v1::{SemanticType, VectorTypeExtension};
392    use common_base::bit_vec::prelude::*;
393
394    use super::*;
395
396    #[test]
397    fn test_request_column_to_row() {
398        let columns = vec![
399            Column {
400                column_name: String::from("col1"),
401                datatype: ColumnDataType::Int32.into(),
402                semantic_type: SemanticType::Field.into(),
403                null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
404                values: Some(Values {
405                    i32_values: vec![42],
406                    ..Default::default()
407                }),
408                ..Default::default()
409            },
410            Column {
411                column_name: String::from("col2"),
412                datatype: ColumnDataType::String.into(),
413                semantic_type: SemanticType::Tag.into(),
414                null_mask: vec![],
415                values: Some(Values {
416                    string_values: vec![
417                        String::from("value1"),
418                        String::from("value2"),
419                        String::from("value3"),
420                    ],
421                    ..Default::default()
422                }),
423                ..Default::default()
424            },
425            Column {
426                column_name: String::from("col3"),
427                datatype: ColumnDataType::Vector.into(),
428                semantic_type: SemanticType::Field.into(),
429                null_mask: vec![],
430                values: Some(Values {
431                    binary_values: vec![vec![0; 4], vec![1; 4], vec![2; 4]],
432                    ..Default::default()
433                }),
434                datatype_extension: Some(ColumnDataTypeExtension {
435                    type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 })),
436                }),
437                ..Default::default()
438            },
439        ];
440        let row_count = 3;
441
442        let result = columns_to_rows(columns, row_count);
443        let rows = result.unwrap();
444
445        assert_eq!(rows.schema.len(), 3);
446        assert_eq!(rows.schema[0].column_name, "col1");
447        assert_eq!(rows.schema[0].datatype, ColumnDataType::Int32 as i32);
448        assert_eq!(rows.schema[0].semantic_type, SemanticType::Field as i32);
449        assert_eq!(rows.schema[1].column_name, "col2");
450        assert_eq!(rows.schema[1].datatype, ColumnDataType::String as i32);
451        assert_eq!(rows.schema[1].semantic_type, SemanticType::Tag as i32);
452        assert_eq!(rows.schema[2].column_name, "col3");
453        assert_eq!(rows.schema[2].datatype, ColumnDataType::Vector as i32);
454        assert_eq!(rows.schema[2].semantic_type, SemanticType::Field as i32);
455        assert_eq!(
456            rows.schema[2].datatype_extension,
457            Some(ColumnDataTypeExtension {
458                type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 }))
459            })
460        );
461
462        assert_eq!(rows.rows.len(), 3);
463
464        assert_eq!(rows.rows[0].values.len(), 3);
465        assert_eq!(rows.rows[0].values[0].value_data, None);
466        assert_eq!(
467            rows.rows[0].values[1].value_data,
468            Some(ValueData::StringValue(String::from("value1")))
469        );
470        assert_eq!(
471            rows.rows[0].values[2].value_data,
472            Some(ValueData::BinaryValue(vec![0; 4]))
473        );
474
475        assert_eq!(rows.rows[1].values.len(), 3);
476        assert_eq!(
477            rows.rows[1].values[0].value_data,
478            Some(ValueData::I32Value(42))
479        );
480        assert_eq!(
481            rows.rows[1].values[1].value_data,
482            Some(ValueData::StringValue(String::from("value2")))
483        );
484        assert_eq!(
485            rows.rows[1].values[2].value_data,
486            Some(ValueData::BinaryValue(vec![1; 4]))
487        );
488
489        assert_eq!(rows.rows[2].values.len(), 3);
490        assert_eq!(rows.rows[2].values[0].value_data, None);
491        assert_eq!(
492            rows.rows[2].values[1].value_data,
493            Some(ValueData::StringValue(String::from("value3")))
494        );
495        assert_eq!(
496            rows.rows[2].values[2].value_data,
497            Some(ValueData::BinaryValue(vec![2; 4]))
498        );
499
500        // wrong type
501        let columns = vec![Column {
502            column_name: String::from("col1"),
503            datatype: ColumnDataType::Int32.into(),
504            semantic_type: SemanticType::Field.into(),
505            null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
506            values: Some(Values {
507                i8_values: vec![42],
508                ..Default::default()
509            }),
510            ..Default::default()
511        }];
512        let row_count = 3;
513        assert!(columns_to_rows(columns, row_count).is_err());
514
515        // wrong row count
516        let columns = vec![Column {
517            column_name: String::from("col1"),
518            datatype: ColumnDataType::Int32.into(),
519            semantic_type: SemanticType::Field.into(),
520            null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(),
521            values: Some(Values {
522                i32_values: vec![42],
523                ..Default::default()
524            }),
525            ..Default::default()
526        }];
527        let row_count = 3;
528        assert!(columns_to_rows(columns, row_count).is_err());
529
530        // wrong row count
531        let columns = vec![Column {
532            column_name: String::from("col1"),
533            datatype: ColumnDataType::Int32.into(),
534            semantic_type: SemanticType::Field.into(),
535            null_mask: vec![],
536            values: Some(Values {
537                i32_values: vec![42],
538                ..Default::default()
539            }),
540            ..Default::default()
541        }];
542        let row_count = 3;
543        assert!(columns_to_rows(columns, row_count).is_err());
544    }
545
546    #[test]
547    fn test_validate_vector_row_success() {
548        let data = ValueData::BinaryValue(vec![0; 4]);
549        let dim = 1;
550        assert!(validate_vector_col(&data, dim).is_ok());
551
552        let data = ValueData::BinaryValue(vec![0; 8]);
553        let dim = 2;
554        assert!(validate_vector_col(&data, dim).is_ok());
555
556        let data = ValueData::BinaryValue(vec![0; 12]);
557        let dim = 3;
558        assert!(validate_vector_col(&data, dim).is_ok());
559    }
560
561    #[test]
562    fn test_validate_vector_row_fail_wrong_type() {
563        let data = ValueData::I32Value(42);
564        let dim = 1;
565        assert!(validate_vector_col(&data, dim).is_err());
566    }
567
568    #[test]
569    fn test_validate_vector_row_fail_wrong_length() {
570        let data = ValueData::BinaryValue(vec![0; 8]);
571        let dim = 1;
572        assert!(validate_vector_col(&data, dim).is_err());
573
574        let data = ValueData::BinaryValue(vec![0; 4]);
575        let dim = 2;
576        assert!(validate_vector_col(&data, dim).is_err());
577    }
578}