1pub(crate) mod partitioner;
16
17use std::collections::HashMap;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::column_data_type_extension::TypeExt;
21use api::v1::column_def::options_from_column_schema;
22use api::v1::value::ValueData;
23use api::v1::{
24 Column, ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25 RowDeleteRequest, RowInsertRequest, Rows, SemanticType, Value,
26};
27use common_base::BitVec;
28use datatypes::prelude::ConcreteDataType;
29use datatypes::vectors::VectorRef;
30use snafu::ResultExt;
31use snafu::prelude::*;
32use table::metadata::TableInfo;
33
34use crate::error::{
35 ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidJsonFormatSnafu,
36 MissingTimeIndexColumnSnafu, Result, UnexpectedSnafu,
37};
38
39fn encode_string_to_jsonb_binary(value_data: ValueData) -> Result<ValueData> {
41 if let ValueData::StringValue(json) = &value_data {
42 let binary = jsonb::parse_value(json.as_bytes())
43 .map_err(|_| InvalidJsonFormatSnafu { json }.build())
44 .map(|jsonb| jsonb.to_vec())?;
45 Ok(ValueData::BinaryValue(binary))
46 } else {
47 UnexpectedSnafu {
48 violated: "Expected to value data to be a string.",
49 }
50 .fail()
51 }
52}
53
54pub fn preprocess_row_insert_requests(requests: &mut Vec<RowInsertRequest>) -> Result<()> {
56 for request in requests {
57 validate_rows(&request.rows)?;
58 prepare_rows(&mut request.rows)?;
59 }
60
61 Ok(())
62}
63
64pub fn preprocess_row_delete_requests(requests: &mut Vec<RowDeleteRequest>) -> Result<()> {
66 for request in requests {
67 validate_rows(&request.rows)?;
68 prepare_rows(&mut request.rows)?;
69 }
70
71 Ok(())
72}
73
74fn prepare_rows(rows: &mut Option<Rows>) -> Result<()> {
75 if let Some(rows) = rows {
76 let indexes = rows
77 .schema
78 .iter()
79 .enumerate()
80 .filter_map(|(idx, schema)| {
81 if schema.datatype() == ColumnDataType::Json {
82 Some(idx)
83 } else {
84 None
85 }
86 })
87 .collect::<Vec<_>>();
88 for idx in &indexes {
89 let column = &mut rows.schema[*idx];
90 column.datatype_extension = Some(ColumnDataTypeExtension {
91 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
92 });
93 column.datatype = ColumnDataType::Json.into();
94 }
95
96 for idx in &indexes {
97 for row in &mut rows.rows {
98 if let Some(value_data) = row.values[*idx].value_data.take() {
99 row.values[*idx].value_data = Some(encode_string_to_jsonb_binary(value_data)?);
100 }
101 }
102 }
103 }
104
105 Ok(())
106}
107
108fn validate_rows(rows: &Option<Rows>) -> Result<()> {
109 let Some(rows) = rows else {
110 return Ok(());
111 };
112
113 for (col_idx, schema) in rows.schema.iter().enumerate() {
114 let column_type =
115 ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension.clone())
116 .context(ColumnDataTypeSnafu)?
117 .into();
118
119 let ConcreteDataType::Vector(d) = column_type else {
120 return Ok(());
121 };
122
123 for row in &rows.rows {
124 let value = &row.values[col_idx].value_data;
125 if let Some(data) = value {
126 validate_vector_col(data, d.dim)?;
127 }
128 }
129 }
130
131 Ok(())
132}
133
134fn validate_vector_col(data: &ValueData, dim: u32) -> Result<()> {
135 let data = match data {
136 ValueData::BinaryValue(data) => data,
137 _ => {
138 return InvalidInsertRequestSnafu {
139 reason: "Expecting binary data for vector column.".to_string(),
140 }
141 .fail();
142 }
143 };
144
145 let expected_len = dim as usize * std::mem::size_of::<f32>();
146 if data.len() != expected_len {
147 return InvalidInsertRequestSnafu {
148 reason: format!(
149 "Expecting {} bytes of data for vector column, but got {}.",
150 expected_len,
151 data.len()
152 ),
153 }
154 .fail();
155 }
156
157 Ok(())
158}
159
160pub fn columns_to_rows(columns: Vec<Column>, row_count: u32) -> Result<Rows> {
161 let row_count = row_count as usize;
162 let column_count = columns.len();
163 let mut schema = Vec::with_capacity(column_count);
164 let mut rows = vec![
165 Row {
166 values: Vec::with_capacity(column_count)
167 };
168 row_count
169 ];
170 for column in columns {
171 let column_schema = ColumnSchema {
172 column_name: column.column_name.clone(),
173 datatype: column.datatype,
174 semantic_type: column.semantic_type,
175 datatype_extension: column.datatype_extension.clone(),
176 options: column.options.clone(),
177 };
178 schema.push(column_schema);
179
180 push_column_to_rows(column, &mut rows)?;
181 }
182
183 Ok(Rows { schema, rows })
184}
185
186fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
187 let null_mask = BitVec::from_vec(column.null_mask);
188 let column_type = ColumnDataTypeWrapper::try_new(column.datatype, column.datatype_extension)
189 .context(ColumnDataTypeSnafu)?
190 .datatype();
191 let column_values = column.values.unwrap_or_default();
192
193 macro_rules! push_column_values_match_types {
194 ($( ($arm:tt, $value_data_variant:tt, $field_name:tt), )*) => { match column_type { $(
195
196 ColumnDataType::$arm => {
197 let row_count = rows.len();
198 let actual_row_count = null_mask.count_ones() + column_values.$field_name.len();
199 ensure!(
200 actual_row_count == row_count,
201 InvalidInsertRequestSnafu {
202 reason: format!(
203 "Expecting {} rows of data for column '{}', but got {}.",
204 row_count, column.column_name, actual_row_count
205 ),
206 }
207 );
208
209 let mut null_mask_iter = null_mask.into_iter();
210 let mut values_iter = column_values.$field_name.into_iter();
211
212 for row in rows {
213 let value_is_null = null_mask_iter.next();
214 if value_is_null == Some(true) {
215 row.values.push(Value { value_data: None });
216 } else {
217 let value = values_iter.next().unwrap();
219 row.values.push(Value {
220 value_data: Some(ValueData::$value_data_variant(value)),
221 });
222 }
223 }
224 }
225
226 )* }}
227 }
228
229 push_column_values_match_types!(
230 (Boolean, BoolValue, bool_values),
231 (Int8, I8Value, i8_values),
232 (Int16, I16Value, i16_values),
233 (Int32, I32Value, i32_values),
234 (Int64, I64Value, i64_values),
235 (Uint8, U8Value, u8_values),
236 (Uint16, U16Value, u16_values),
237 (Uint32, U32Value, u32_values),
238 (Uint64, U64Value, u64_values),
239 (Float32, F32Value, f32_values),
240 (Float64, F64Value, f64_values),
241 (Binary, BinaryValue, binary_values),
242 (String, StringValue, string_values),
243 (Json, StringValue, string_values),
244 (Date, DateValue, date_values),
245 (Datetime, DatetimeValue, datetime_values),
246 (
247 TimestampSecond,
248 TimestampSecondValue,
249 timestamp_second_values
250 ),
251 (
252 TimestampMillisecond,
253 TimestampMillisecondValue,
254 timestamp_millisecond_values
255 ),
256 (
257 TimestampMicrosecond,
258 TimestampMicrosecondValue,
259 timestamp_microsecond_values
260 ),
261 (
262 TimestampNanosecond,
263 TimestampNanosecondValue,
264 timestamp_nanosecond_values
265 ),
266 (TimeSecond, TimeSecondValue, time_second_values),
267 (
268 TimeMillisecond,
269 TimeMillisecondValue,
270 time_millisecond_values
271 ),
272 (
273 TimeMicrosecond,
274 TimeMicrosecondValue,
275 time_microsecond_values
276 ),
277 (TimeNanosecond, TimeNanosecondValue, time_nanosecond_values),
278 (
279 IntervalYearMonth,
280 IntervalYearMonthValue,
281 interval_year_month_values
282 ),
283 (
284 IntervalDayTime,
285 IntervalDayTimeValue,
286 interval_day_time_values
287 ),
288 (
289 IntervalMonthDayNano,
290 IntervalMonthDayNanoValue,
291 interval_month_day_nano_values
292 ),
293 (Decimal128, Decimal128Value, decimal128_values),
294 (Vector, BinaryValue, binary_values),
295 (List, ListValue, list_values),
296 (Struct, StructValue, struct_values),
297 );
298
299 Ok(())
300}
301
302pub fn row_count(columns: &HashMap<String, VectorRef>) -> Result<usize> {
303 let mut columns_iter = columns.values();
304
305 let len = columns_iter
306 .next()
307 .map(|column| column.len())
308 .unwrap_or_default();
309 ensure!(
310 columns_iter.all(|column| column.len() == len),
311 InvalidInsertRequestSnafu {
312 reason: "The row count of columns is not the same."
313 }
314 );
315
316 Ok(len)
317}
318
319pub fn column_schema(
320 table_info: &TableInfo,
321 columns: &HashMap<String, VectorRef>,
322) -> Result<Vec<ColumnSchema>> {
323 columns
324 .keys()
325 .map(|column_name| {
326 let column_schema = table_info
327 .meta
328 .schema
329 .column_schema_by_name(column_name)
330 .context(ColumnNotFoundSnafu {
331 msg: format!("unable to find column {column_name} in table schema"),
332 })?;
333
334 let (datatype, datatype_extension) =
335 ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
336 .context(ColumnDataTypeSnafu)?
337 .to_parts();
338
339 Ok(ColumnSchema {
340 column_name: column_name.clone(),
341 datatype: datatype as i32,
342 semantic_type: semantic_type(table_info, column_name)?.into(),
343 datatype_extension,
344 options: options_from_column_schema(column_schema),
345 })
346 })
347 .collect::<Result<Vec<_>>>()
348}
349
350fn semantic_type(table_info: &TableInfo, column: &str) -> Result<SemanticType> {
351 let table_meta = &table_info.meta;
352 let table_schema = &table_meta.schema;
353
354 let time_index_column = &table_schema
355 .timestamp_column()
356 .with_context(|| table::error::MissingTimeIndexColumnSnafu {
357 table_name: table_info.name.clone(),
358 })
359 .context(MissingTimeIndexColumnSnafu)?
360 .name;
361
362 let semantic_type = if column == time_index_column {
363 SemanticType::Timestamp
364 } else {
365 let column_index = table_schema.column_index_by_name(column);
366 let column_index = column_index.context(ColumnNotFoundSnafu {
367 msg: format!("unable to find column {column} in table schema"),
368 })?;
369
370 if table_meta.primary_key_indices.contains(&column_index) {
371 SemanticType::Tag
372 } else {
373 SemanticType::Field
374 }
375 };
376
377 Ok(semantic_type)
378}
379
380#[cfg(test)]
381mod tests {
382 use api::v1::column::Values;
383 use api::v1::{SemanticType, VectorTypeExtension};
384 use common_base::bit_vec::prelude::*;
385
386 use super::*;
387
388 #[test]
389 fn test_request_column_to_row() {
390 let columns = vec![
391 Column {
392 column_name: String::from("col1"),
393 datatype: ColumnDataType::Int32.into(),
394 semantic_type: SemanticType::Field.into(),
395 null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
396 values: Some(Values {
397 i32_values: vec![42],
398 ..Default::default()
399 }),
400 ..Default::default()
401 },
402 Column {
403 column_name: String::from("col2"),
404 datatype: ColumnDataType::String.into(),
405 semantic_type: SemanticType::Tag.into(),
406 null_mask: vec![],
407 values: Some(Values {
408 string_values: vec![
409 String::from("value1"),
410 String::from("value2"),
411 String::from("value3"),
412 ],
413 ..Default::default()
414 }),
415 ..Default::default()
416 },
417 Column {
418 column_name: String::from("col3"),
419 datatype: ColumnDataType::Vector.into(),
420 semantic_type: SemanticType::Field.into(),
421 null_mask: vec![],
422 values: Some(Values {
423 binary_values: vec![vec![0; 4], vec![1; 4], vec![2; 4]],
424 ..Default::default()
425 }),
426 datatype_extension: Some(ColumnDataTypeExtension {
427 type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 })),
428 }),
429 ..Default::default()
430 },
431 ];
432 let row_count = 3;
433
434 let result = columns_to_rows(columns, row_count);
435 let rows = result.unwrap();
436
437 assert_eq!(rows.schema.len(), 3);
438 assert_eq!(rows.schema[0].column_name, "col1");
439 assert_eq!(rows.schema[0].datatype, ColumnDataType::Int32 as i32);
440 assert_eq!(rows.schema[0].semantic_type, SemanticType::Field as i32);
441 assert_eq!(rows.schema[1].column_name, "col2");
442 assert_eq!(rows.schema[1].datatype, ColumnDataType::String as i32);
443 assert_eq!(rows.schema[1].semantic_type, SemanticType::Tag as i32);
444 assert_eq!(rows.schema[2].column_name, "col3");
445 assert_eq!(rows.schema[2].datatype, ColumnDataType::Vector as i32);
446 assert_eq!(rows.schema[2].semantic_type, SemanticType::Field as i32);
447 assert_eq!(
448 rows.schema[2].datatype_extension,
449 Some(ColumnDataTypeExtension {
450 type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 }))
451 })
452 );
453
454 assert_eq!(rows.rows.len(), 3);
455
456 assert_eq!(rows.rows[0].values.len(), 3);
457 assert_eq!(rows.rows[0].values[0].value_data, None);
458 assert_eq!(
459 rows.rows[0].values[1].value_data,
460 Some(ValueData::StringValue(String::from("value1")))
461 );
462 assert_eq!(
463 rows.rows[0].values[2].value_data,
464 Some(ValueData::BinaryValue(vec![0; 4]))
465 );
466
467 assert_eq!(rows.rows[1].values.len(), 3);
468 assert_eq!(
469 rows.rows[1].values[0].value_data,
470 Some(ValueData::I32Value(42))
471 );
472 assert_eq!(
473 rows.rows[1].values[1].value_data,
474 Some(ValueData::StringValue(String::from("value2")))
475 );
476 assert_eq!(
477 rows.rows[1].values[2].value_data,
478 Some(ValueData::BinaryValue(vec![1; 4]))
479 );
480
481 assert_eq!(rows.rows[2].values.len(), 3);
482 assert_eq!(rows.rows[2].values[0].value_data, None);
483 assert_eq!(
484 rows.rows[2].values[1].value_data,
485 Some(ValueData::StringValue(String::from("value3")))
486 );
487 assert_eq!(
488 rows.rows[2].values[2].value_data,
489 Some(ValueData::BinaryValue(vec![2; 4]))
490 );
491
492 let columns = vec![Column {
494 column_name: String::from("col1"),
495 datatype: ColumnDataType::Int32.into(),
496 semantic_type: SemanticType::Field.into(),
497 null_mask: bitvec![u8, Lsb0; 1, 0, 1].into_vec(),
498 values: Some(Values {
499 i8_values: vec![42],
500 ..Default::default()
501 }),
502 ..Default::default()
503 }];
504 let row_count = 3;
505 assert!(columns_to_rows(columns, row_count).is_err());
506
507 let columns = vec![Column {
509 column_name: String::from("col1"),
510 datatype: ColumnDataType::Int32.into(),
511 semantic_type: SemanticType::Field.into(),
512 null_mask: bitvec![u8, Lsb0; 0, 0, 1].into_vec(),
513 values: Some(Values {
514 i32_values: vec![42],
515 ..Default::default()
516 }),
517 ..Default::default()
518 }];
519 let row_count = 3;
520 assert!(columns_to_rows(columns, row_count).is_err());
521
522 let columns = vec![Column {
524 column_name: String::from("col1"),
525 datatype: ColumnDataType::Int32.into(),
526 semantic_type: SemanticType::Field.into(),
527 null_mask: vec![],
528 values: Some(Values {
529 i32_values: vec![42],
530 ..Default::default()
531 }),
532 ..Default::default()
533 }];
534 let row_count = 3;
535 assert!(columns_to_rows(columns, row_count).is_err());
536 }
537
538 #[test]
539 fn test_validate_vector_row_success() {
540 let data = ValueData::BinaryValue(vec![0; 4]);
541 let dim = 1;
542 assert!(validate_vector_col(&data, dim).is_ok());
543
544 let data = ValueData::BinaryValue(vec![0; 8]);
545 let dim = 2;
546 assert!(validate_vector_col(&data, dim).is_ok());
547
548 let data = ValueData::BinaryValue(vec![0; 12]);
549 let dim = 3;
550 assert!(validate_vector_col(&data, dim).is_ok());
551 }
552
553 #[test]
554 fn test_validate_vector_row_fail_wrong_type() {
555 let data = ValueData::I32Value(42);
556 let dim = 1;
557 assert!(validate_vector_col(&data, dim).is_err());
558 }
559
560 #[test]
561 fn test_validate_vector_row_fail_wrong_length() {
562 let data = ValueData::BinaryValue(vec![0; 8]);
563 let dim = 1;
564 assert!(validate_vector_col(&data, dim).is_err());
565
566 let data = ValueData::BinaryValue(vec![0; 4]);
567 let dim = 2;
568 assert!(validate_vector_col(&data, dim).is_err());
569 }
570}