1pub(crate) mod partitioner;
16
17use std::collections::HashMap;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::column_data_type_extension::TypeExt;
21use api::v1::column_def::options_from_column_schema;
22use api::v1::value::ValueData;
23use api::v1::{
24 Column, ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25 RowDeleteRequest, RowInsertRequest, Rows, SemanticType, Value,
26};
27use common_base::BitVec;
28use datatypes::prelude::ConcreteDataType;
29use datatypes::vectors::VectorRef;
30use snafu::ResultExt;
31use snafu::prelude::*;
32use table::metadata::TableInfo;
33
34use crate::error::{
35 ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidJsonFormatSnafu,
36 MissingTimeIndexColumnSnafu, Result, UnexpectedSnafu,
37};
38
39fn encode_string_to_jsonb_binary(value_data: ValueData) -> Result<ValueData> {
41 if let ValueData::StringValue(json) = &value_data {
42 let binary = jsonb::parse_value(json.as_bytes())
43 .map_err(|_| InvalidJsonFormatSnafu { json }.build())
44 .map(|jsonb| jsonb.to_vec())?;
45 Ok(ValueData::BinaryValue(binary))
46 } else {
47 UnexpectedSnafu {
48 violated: "Expected to value data to be a string.",
49 }
50 .fail()
51 }
52}
53
54pub fn preprocess_row_insert_requests(requests: &mut Vec<RowInsertRequest>) -> Result<()> {
56 for request in requests {
57 validate_rows(&request.rows)?;
58 prepare_rows(&mut request.rows)?;
59 }
60
61 Ok(())
62}
63
64pub fn preprocess_row_delete_requests(requests: &mut Vec<RowDeleteRequest>) -> Result<()> {
66 for request in requests {
67 validate_rows(&request.rows)?;
68 prepare_rows(&mut request.rows)?;
69 }
70
71 Ok(())
72}
73
74fn prepare_rows(rows: &mut Option<Rows>) -> Result<()> {
75 if let Some(rows) = rows {
76 let indexes = rows
77 .schema
78 .iter()
79 .enumerate()
80 .filter_map(|(idx, schema)| {
81 if schema.datatype() == ColumnDataType::Json {
82 Some(idx)
83 } else {
84 None
85 }
86 })
87 .collect::<Vec<_>>();
88 for idx in &indexes {
89 let column = &mut rows.schema[*idx];
90 column.datatype_extension = Some(ColumnDataTypeExtension {
91 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
92 });
93 column.datatype = ColumnDataType::Json.into();
94 }
95
96 for idx in &indexes {
97 for row in &mut rows.rows {
98 if let Some(value_data) = row.values[*idx].value_data.take() {
99 row.values[*idx].value_data = Some(encode_string_to_jsonb_binary(value_data)?);
100 }
101 }
102 }
103 }
104
105 Ok(())
106}
107
108fn validate_rows(rows: &Option<Rows>) -> Result<()> {
109 let Some(rows) = rows else {
110 return Ok(());
111 };
112
113 for (col_idx, schema) in rows.schema.iter().enumerate() {
114 let column_type =
115 ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension.clone())
116 .context(ColumnDataTypeSnafu)?
117 .into();
118
119 let ConcreteDataType::Vector(d) = column_type else {
120 return Ok(());
121 };
122
123 for row in &rows.rows {
124 let value = &row.values[col_idx].value_data;
125 if let Some(data) = value {
126 validate_vector_col(data, d.dim)?;
127 }
128 }
129 }
130
131 Ok(())
132}
133
134fn validate_vector_col(data: &ValueData, dim: u32) -> Result<()> {
135 let data = match data {
136 ValueData::BinaryValue(data) => data,
137 _ => {
138 return InvalidInsertRequestSnafu {
139 reason: "Expecting binary data for vector column.".to_string(),
140 }
141 .fail();
142 }
143 };
144
145 let expected_len = dim as usize * std::mem::size_of::<f32>();
146 if data.len() != expected_len {
147 return InvalidInsertRequestSnafu {
148 reason: format!(
149 "Expecting {} bytes of data for vector column, but got {}.",
150 expected_len,
151 data.len()
152 ),
153 }
154 .fail();
155 }
156
157 Ok(())
158}
159
160pub fn columns_to_rows(columns: Vec<Column>, row_count: u32) -> Result<Rows> {
161 let row_count = row_count as usize;
162 let column_count = columns.len();
163 let mut schema = Vec::with_capacity(column_count);
164 let mut rows = vec![
165 Row {
166 values: Vec::with_capacity(column_count)
167 };
168 row_count
169 ];
170 for column in columns {
171 let column_schema = ColumnSchema {
172 column_name: column.column_name.clone(),
173 datatype: column.datatype,
174 semantic_type: column.semantic_type,
175 datatype_extension: column.datatype_extension.clone(),
176 options: column.options.clone(),
177 };
178 schema.push(column_schema);
179
180 push_column_to_rows(column, &mut rows)?;
181 }
182
183 Ok(Rows { schema, rows })
184}
185
186fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
187 let null_mask = BitVec::from_vec(column.null_mask);
188 let column_type = ColumnDataTypeWrapper::try_new(column.datatype, column.datatype_extension)
189 .context(ColumnDataTypeSnafu)?
190 .datatype();
191 let column_values = column.values.unwrap_or_default();
192
193 macro_rules! push_column_values_match_types {
194 ($( ($arm:tt, $value_data_variant:tt, $field_name:tt), )*) => { match column_type { $(
195
196 ColumnDataType::$arm => {
197 let row_count = rows.len();
198 let actual_row_count = null_mask.count_ones() + column_values.$field_name.len();
199 ensure!(
200 actual_row_count == row_count,
201 InvalidInsertRequestSnafu {
202 reason: format!(
203 "Expecting {} rows of data for column '{}', but got {}.",
204 row_count, column.column_name, actual_row_count
205 ),
206 }
207 );
208
209 let mut null_mask_iter = null_mask.into_iter();
210 let mut values_iter = column_values.$field_name.into_iter();
211
212 for row in rows {
213 let value_is_null = null_mask_iter.next();
214 if value_is_null == Some(true) {
215 row.values.push(Value { value_data: None });
216 } else {
217 let value = values_iter.next().unwrap();
219 row.values.push(Value {
220 value_data: Some(ValueData::$value_data_variant(value)),
221 });
222 }
223 }
224 }
225
226 )* _ => {
227 return InvalidInsertRequestSnafu {
228 reason: format!(
229 "Column '{}' with type {:?} is not supported in row inserts.",
230 column.column_name, column_type
231 ),
232 }
233 .fail();
234 } }}
235 }
236
237 push_column_values_match_types!(
238 (Boolean, BoolValue, bool_values),
239 (Int8, I8Value, i8_values),
240 (Int16, I16Value, i16_values),
241 (Int32, I32Value, i32_values),
242 (Int64, I64Value, i64_values),
243 (Uint8, U8Value, u8_values),
244 (Uint16, U16Value, u16_values),
245 (Uint32, U32Value, u32_values),
246 (Uint64, U64Value, u64_values),
247 (Float32, F32Value, f32_values),
248 (Float64, F64Value, f64_values),
249 (Binary, BinaryValue, binary_values),
250 (String, StringValue, string_values),
251 (Json, StringValue, string_values),
252 (Date, DateValue, date_values),
253 (Datetime, DatetimeValue, datetime_values),
254 (
255 TimestampSecond,
256 TimestampSecondValue,
257 timestamp_second_values
258 ),
259 (
260 TimestampMillisecond,
261 TimestampMillisecondValue,
262 timestamp_millisecond_values
263 ),
264 (
265 TimestampMicrosecond,
266 TimestampMicrosecondValue,
267 timestamp_microsecond_values
268 ),
269 (
270 TimestampNanosecond,
271 TimestampNanosecondValue,
272 timestamp_nanosecond_values
273 ),
274 (TimeSecond, TimeSecondValue, time_second_values),
275 (
276 TimeMillisecond,
277 TimeMillisecondValue,
278 time_millisecond_values
279 ),
280 (
281 TimeMicrosecond,
282 TimeMicrosecondValue,
283 time_microsecond_values
284 ),
285 (TimeNanosecond, TimeNanosecondValue, time_nanosecond_values),
286 (
287 IntervalYearMonth,
288 IntervalYearMonthValue,
289 interval_year_month_values
290 ),
291 (
292 IntervalDayTime,
293 IntervalDayTimeValue,
294 interval_day_time_values
295 ),
296 (
297 IntervalMonthDayNano,
298 IntervalMonthDayNanoValue,
299 interval_month_day_nano_values
300 ),
301 (Decimal128, Decimal128Value, decimal128_values),
302 (Vector, BinaryValue, binary_values),
303 (List, ListValue, list_values),
304 (Struct, StructValue, struct_values),
305 );
306
307 Ok(())
308}
309
310pub fn row_count(columns: &HashMap<String, VectorRef>) -> Result<usize> {
311 let mut columns_iter = columns.values();
312
313 let len = columns_iter
314 .next()
315 .map(|column| column.len())
316 .unwrap_or_default();
317 ensure!(
318 columns_iter.all(|column| column.len() == len),
319 InvalidInsertRequestSnafu {
320 reason: "The row count of columns is not the same."
321 }
322 );
323
324 Ok(len)
325}
326
327pub fn column_schema(
328 table_info: &TableInfo,
329 columns: &HashMap<String, VectorRef>,
330) -> Result<Vec<ColumnSchema>> {
331 columns
332 .keys()
333 .map(|column_name| {
334 let column_schema = table_info
335 .meta
336 .schema
337 .column_schema_by_name(column_name)
338 .context(ColumnNotFoundSnafu {
339 msg: format!("unable to find column {column_name} in table schema"),
340 })?;
341
342 let (datatype, datatype_extension) =
343 ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
344 .context(ColumnDataTypeSnafu)?
345 .to_parts();
346
347 Ok(ColumnSchema {
348 column_name: column_name.clone(),
349 datatype: datatype as i32,
350 semantic_type: semantic_type(table_info, column_name)?.into(),
351 datatype_extension,
352 options: options_from_column_schema(column_schema),
353 })
354 })
355 .collect::<Result<Vec<_>>>()
356}
357
358fn semantic_type(table_info: &TableInfo, column: &str) -> Result<SemanticType> {
359 let table_meta = &table_info.meta;
360 let table_schema = &table_meta.schema;
361
362 let time_index_column = &table_schema
363 .timestamp_column()
364 .with_context(|| table::error::MissingTimeIndexColumnSnafu {
365 table_name: table_info.name.clone(),
366 })
367 .context(MissingTimeIndexColumnSnafu)?
368 .name;
369
370 let semantic_type = if column == time_index_column {
371 SemanticType::Timestamp
372 } else {
373 let column_index = table_schema.column_index_by_name(column);
374 let column_index = column_index.context(ColumnNotFoundSnafu {
375 msg: format!("unable to find column {column} in table schema"),
376 })?;
377
378 if table_meta.primary_key_indices.contains(&column_index) {
379 SemanticType::Tag
380 } else {
381 SemanticType::Field
382 }
383 };
384
385 Ok(semantic_type)
386}
387
388#[cfg(test)]
389mod tests {
390 use api::v1::column::Values;
391 use api::v1::{SemanticType, VectorTypeExtension};
392 use common_base::bit_vec::prelude::*;
393
394 use super::*;
395
396 #[test]
397 fn test_request_column_to_row() {
398 let columns = vec![
399 Column {
400 column_name: String::from("col1"),
401 datatype: ColumnDataType::Int32.into(),
402 semantic_type: SemanticType::Field.into(),
403 null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
404 values: Some(Values {
405 i32_values: vec![42],
406 ..Default::default()
407 }),
408 ..Default::default()
409 },
410 Column {
411 column_name: String::from("col2"),
412 datatype: ColumnDataType::String.into(),
413 semantic_type: SemanticType::Tag.into(),
414 null_mask: vec![],
415 values: Some(Values {
416 string_values: vec![
417 String::from("value1"),
418 String::from("value2"),
419 String::from("value3"),
420 ],
421 ..Default::default()
422 }),
423 ..Default::default()
424 },
425 Column {
426 column_name: String::from("col3"),
427 datatype: ColumnDataType::Vector.into(),
428 semantic_type: SemanticType::Field.into(),
429 null_mask: vec![],
430 values: Some(Values {
431 binary_values: vec![vec![0; 4], vec![1; 4], vec![2; 4]],
432 ..Default::default()
433 }),
434 datatype_extension: Some(ColumnDataTypeExtension {
435 type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 })),
436 }),
437 ..Default::default()
438 },
439 ];
440 let row_count = 3;
441
442 let result = columns_to_rows(columns, row_count);
443 let rows = result.unwrap();
444
445 assert_eq!(rows.schema.len(), 3);
446 assert_eq!(rows.schema[0].column_name, "col1");
447 assert_eq!(rows.schema[0].datatype, ColumnDataType::Int32 as i32);
448 assert_eq!(rows.schema[0].semantic_type, SemanticType::Field as i32);
449 assert_eq!(rows.schema[1].column_name, "col2");
450 assert_eq!(rows.schema[1].datatype, ColumnDataType::String as i32);
451 assert_eq!(rows.schema[1].semantic_type, SemanticType::Tag as i32);
452 assert_eq!(rows.schema[2].column_name, "col3");
453 assert_eq!(rows.schema[2].datatype, ColumnDataType::Vector as i32);
454 assert_eq!(rows.schema[2].semantic_type, SemanticType::Field as i32);
455 assert_eq!(
456 rows.schema[2].datatype_extension,
457 Some(ColumnDataTypeExtension {
458 type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 }))
459 })
460 );
461
462 assert_eq!(rows.rows.len(), 3);
463
464 assert_eq!(rows.rows[0].values.len(), 3);
465 assert_eq!(rows.rows[0].values[0].value_data, None);
466 assert_eq!(
467 rows.rows[0].values[1].value_data,
468 Some(ValueData::StringValue(String::from("value1")))
469 );
470 assert_eq!(
471 rows.rows[0].values[2].value_data,
472 Some(ValueData::BinaryValue(vec![0; 4]))
473 );
474
475 assert_eq!(rows.rows[1].values.len(), 3);
476 assert_eq!(
477 rows.rows[1].values[0].value_data,
478 Some(ValueData::I32Value(42))
479 );
480 assert_eq!(
481 rows.rows[1].values[1].value_data,
482 Some(ValueData::StringValue(String::from("value2")))
483 );
484 assert_eq!(
485 rows.rows[1].values[2].value_data,
486 Some(ValueData::BinaryValue(vec![1; 4]))
487 );
488
489 assert_eq!(rows.rows[2].values.len(), 3);
490 assert_eq!(rows.rows[2].values[0].value_data, None);
491 assert_eq!(
492 rows.rows[2].values[1].value_data,
493 Some(ValueData::StringValue(String::from("value3")))
494 );
495 assert_eq!(
496 rows.rows[2].values[2].value_data,
497 Some(ValueData::BinaryValue(vec![2; 4]))
498 );
499
500 let columns = vec![Column {
502 column_name: String::from("col1"),
503 datatype: ColumnDataType::Int32.into(),
504 semantic_type: SemanticType::Field.into(),
505 null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
506 values: Some(Values {
507 i8_values: vec![42],
508 ..Default::default()
509 }),
510 ..Default::default()
511 }];
512 let row_count = 3;
513 assert!(columns_to_rows(columns, row_count).is_err());
514
515 let columns = vec![Column {
517 column_name: String::from("col1"),
518 datatype: ColumnDataType::Int32.into(),
519 semantic_type: SemanticType::Field.into(),
520 null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(),
521 values: Some(Values {
522 i32_values: vec![42],
523 ..Default::default()
524 }),
525 ..Default::default()
526 }];
527 let row_count = 3;
528 assert!(columns_to_rows(columns, row_count).is_err());
529
530 let columns = vec![Column {
532 column_name: String::from("col1"),
533 datatype: ColumnDataType::Int32.into(),
534 semantic_type: SemanticType::Field.into(),
535 null_mask: vec![],
536 values: Some(Values {
537 i32_values: vec![42],
538 ..Default::default()
539 }),
540 ..Default::default()
541 }];
542 let row_count = 3;
543 assert!(columns_to_rows(columns, row_count).is_err());
544 }
545
546 #[test]
547 fn test_validate_vector_row_success() {
548 let data = ValueData::BinaryValue(vec![0; 4]);
549 let dim = 1;
550 assert!(validate_vector_col(&data, dim).is_ok());
551
552 let data = ValueData::BinaryValue(vec![0; 8]);
553 let dim = 2;
554 assert!(validate_vector_col(&data, dim).is_ok());
555
556 let data = ValueData::BinaryValue(vec![0; 12]);
557 let dim = 3;
558 assert!(validate_vector_col(&data, dim).is_ok());
559 }
560
561 #[test]
562 fn test_validate_vector_row_fail_wrong_type() {
563 let data = ValueData::I32Value(42);
564 let dim = 1;
565 assert!(validate_vector_col(&data, dim).is_err());
566 }
567
568 #[test]
569 fn test_validate_vector_row_fail_wrong_length() {
570 let data = ValueData::BinaryValue(vec![0; 8]);
571 let dim = 1;
572 assert!(validate_vector_col(&data, dim).is_err());
573
574 let data = ValueData::BinaryValue(vec![0; 4]);
575 let dim = 2;
576 assert!(validate_vector_col(&data, dim).is_err());
577 }
578}