1use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21
22use api::helper::ColumnDataTypeWrapper;
23use api::v1::value::ValueData;
24use api::v1::{ColumnSchema, Rows, SemanticType};
25use arrow::array::{
26 ArrayRef, Float64Builder, StringBuilder, TimestampMicrosecondBuilder,
27 TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder,
28 new_null_array,
29};
30use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema};
31use arrow::record_batch::RecordBatch;
32use arrow_schema::TimeUnit;
33use common_query::prelude::{greptime_timestamp, greptime_value};
34use datatypes::data_type::DataType;
35use datatypes::prelude::ConcreteDataType;
36use snafu::{OptionExt, ResultExt, ensure};
37
38use crate::error;
39use crate::error::Result;
40
41fn unzip_logical_region_schema(
43 target_schema: &ArrowSchema,
44) -> Result<(String, String, HashSet<String>)> {
45 let mut timestamp_column = None;
46 let mut field_column = None;
47 let mut tag_columns = HashSet::with_capacity(target_schema.fields.len().saturating_sub(2));
48 for field in target_schema.fields() {
49 if field.name() == greptime_timestamp() {
50 timestamp_column = Some(field.name().clone());
51 continue;
52 }
53
54 if field.name() == greptime_value() {
55 field_column = Some(field.name().clone());
56 continue;
57 }
58
59 if timestamp_column.is_none() && matches!(field.data_type(), ArrowDataType::Timestamp(_, _))
60 {
61 timestamp_column = Some(field.name().clone());
62 continue;
63 }
64
65 if field_column.is_none() && matches!(field.data_type(), ArrowDataType::Float64) {
66 field_column = Some(field.name().clone());
67 continue;
68 }
69 tag_columns.insert(field.name().clone());
70 }
71
72 let timestamp_column = timestamp_column.with_context(|| error::UnexpectedResultSnafu {
73 reason: "Failed to locate timestamp column in target schema".to_string(),
74 })?;
75 let field_column = field_column.with_context(|| error::UnexpectedResultSnafu {
76 reason: "Failed to locate field column in target schema".to_string(),
77 })?;
78
79 Ok((timestamp_column, field_column, tag_columns))
80}
81
82pub(crate) fn rows_to_aligned_record_batch(
86 rows: &Rows,
87 target_schema: &ArrowSchema,
88) -> Result<RecordBatch> {
89 let row_count = rows.rows.len();
90 let column_count = rows.schema.len();
91
92 for (idx, row) in rows.rows.iter().enumerate() {
93 ensure!(
94 row.values.len() == column_count,
95 error::InternalSnafu {
96 err_msg: format!(
97 "Column count mismatch in row {}, expected {}, got {}",
98 idx,
99 column_count,
100 row.values.len()
101 )
102 }
103 );
104 }
105
106 let (target_ts_name, target_field_name, _target_tags) =
107 unzip_logical_region_schema(target_schema)?;
108
109 let mut source_map: HashMap<&str, (usize, ArrowDataType)> =
112 HashMap::with_capacity(rows.schema.len());
113
114 for (src_idx, col) in rows.schema.iter().enumerate() {
115 let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?;
116 let src_arrow_type = ConcreteDataType::from(wrapper).as_arrow_type();
117
118 match &src_arrow_type {
119 ArrowDataType::Float64 => {
120 source_map.insert(&target_field_name, (src_idx, src_arrow_type));
121 }
122 ArrowDataType::Timestamp(unit, _) => {
123 ensure!(
124 unit == &TimeUnit::Millisecond,
125 error::InvalidPromRemoteRequestSnafu {
126 msg: format!(
127 "Unexpected remote write batch timestamp unit, expect millisecond, got: {}",
128 unit
129 )
130 }
131 );
132 source_map.insert(&target_ts_name, (src_idx, src_arrow_type));
133 }
134 ArrowDataType::Utf8 => {
135 source_map.insert(&col.column_name, (src_idx, src_arrow_type));
136 }
137 other => {
138 return error::InvalidPromRemoteRequestSnafu {
139 msg: format!(
140 "Unexpected remote write batch field type {}, field name: {}",
141 other, col.column_name
142 ),
143 }
144 .fail();
145 }
146 }
147 }
148
149 let mut columns = Vec::with_capacity(target_schema.fields().len());
151 for target_field in target_schema.fields() {
152 if let Some((src_idx, src_arrow_type)) = source_map.get(target_field.name().as_str()) {
153 let array = build_arrow_array(
154 rows,
155 *src_idx,
156 &rows.schema[*src_idx].column_name,
157 src_arrow_type.clone(),
158 row_count,
159 )?;
160 columns.push(array);
161 } else {
162 columns.push(new_null_array(target_field.data_type(), row_count));
163 }
164 }
165
166 let batch = RecordBatch::try_new(Arc::new(target_schema.clone()), columns)
167 .context(error::ArrowSnafu)?;
168 Ok(batch)
169}
170
171pub(crate) fn identify_missing_columns_from_proto(
174 rows_schema: &[ColumnSchema],
175 target_schema: &ArrowSchema,
176) -> Result<Vec<String>> {
177 let (_, _, target_tags) = unzip_logical_region_schema(target_schema)?;
178 let mut missing = Vec::new();
179 for col in rows_schema {
180 let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?;
181 let arrow_type = ConcreteDataType::from(wrapper).as_arrow_type();
182 if matches!(arrow_type, ArrowDataType::Utf8)
183 && !target_tags.contains(&col.column_name)
184 && target_schema.column_with_name(&col.column_name).is_none()
185 {
186 missing.push(col.column_name.clone());
187 }
188 }
189 Ok(missing)
190}
191
192pub fn build_prom_create_table_schema_from_proto(
195 rows_schema: &[ColumnSchema],
196) -> Result<Vec<ColumnSchema>> {
197 rows_schema
198 .iter()
199 .map(|col| {
200 let semantic_type = if col.datatype == api::v1::ColumnDataType::TimestampMillisecond as i32 {
201 SemanticType::Timestamp
202 } else if col.datatype == api::v1::ColumnDataType::Float64 as i32 {
203 SemanticType::Field
204 } else {
205 ensure!(col.datatype == api::v1::ColumnDataType::String as i32, error::InvalidPromRemoteRequestSnafu{
207 msg: format!(
208 "Failed to build create table schema, tag column '{}' must be String but got datatype {}",
209 col.column_name, col.datatype
210 )
211 });
212 SemanticType::Tag
213 };
214
215 Ok(ColumnSchema {
216 column_name: col.column_name.clone(),
217 datatype: col.datatype,
218 semantic_type: semantic_type as i32,
219 datatype_extension: col.datatype_extension.clone(),
220 options: None,
221 })
222 })
223 .collect()
224}
225
226fn build_arrow_array(
228 rows: &Rows,
229 col_idx: usize,
230 column_name: &String,
231 column_data_type: arrow::datatypes::DataType,
232 row_count: usize,
233) -> Result<ArrayRef> {
234 macro_rules! build_array {
235 ($builder:expr, $( $pattern:pat => $value:expr ),+ $(,)?) => {{
236 let mut builder = $builder;
237 for row in &rows.rows {
238 match row.values[col_idx].value_data.as_ref() {
239 $(Some($pattern) => builder.append_value($value),)+
240 Some(v) => {
241 return error::InvalidPromRemoteRequestSnafu {
242 msg: format!("Unexpected value: {:?}", v),
243 }
244 .fail();
245 }
246 None => builder.append_null(),
247 }
248 }
249 Arc::new(builder.finish()) as ArrayRef
250 }};
251 }
252
253 let array: ArrayRef = match column_data_type {
254 arrow::datatypes::DataType::Float64 => {
255 build_array!(Float64Builder::with_capacity(row_count), ValueData::F64Value(v) => *v)
256 }
257 arrow::datatypes::DataType::Utf8 => build_array!(
258 StringBuilder::with_capacity(row_count, 0),
259 ValueData::StringValue(v) => v
260 ),
261 arrow::datatypes::DataType::Timestamp(u, _) => match u {
262 TimeUnit::Second => build_array!(
263 TimestampSecondBuilder::with_capacity(row_count),
264 ValueData::TimestampSecondValue(v) => *v
265 ),
266 TimeUnit::Millisecond => build_array!(
267 TimestampMillisecondBuilder::with_capacity(row_count),
268 ValueData::TimestampMillisecondValue(v) => *v
269 ),
270 TimeUnit::Microsecond => build_array!(
271 TimestampMicrosecondBuilder::with_capacity(row_count),
272 ValueData::DatetimeValue(v) => *v,
273 ValueData::TimestampMicrosecondValue(v) => *v
274 ),
275 TimeUnit::Nanosecond => build_array!(
276 TimestampNanosecondBuilder::with_capacity(row_count),
277 ValueData::TimestampNanosecondValue(v) => *v
278 ),
279 },
280 ty => {
281 return error::InvalidPromRemoteRequestSnafu {
282 msg: format!(
283 "Unexpected column type {:?}, column name: {}",
284 ty, column_name
285 ),
286 }
287 .fail();
288 }
289 };
290
291 Ok(array)
292}
293
294#[cfg(test)]
295mod tests {
296 use api::v1::value::ValueData;
297 use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
298 use arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
299 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
300
301 use super::{
302 build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
303 rows_to_aligned_record_batch,
304 };
305
306 #[test]
307 fn test_rows_to_aligned_record_batch_renames_and_reorders() {
308 let rows = Rows {
309 schema: vec![
310 ColumnSchema {
311 column_name: "greptime_timestamp".to_string(),
312 datatype: ColumnDataType::TimestampMillisecond as i32,
313 semantic_type: SemanticType::Timestamp as i32,
314 ..Default::default()
315 },
316 ColumnSchema {
317 column_name: "host".to_string(),
318 datatype: ColumnDataType::String as i32,
319 semantic_type: SemanticType::Tag as i32,
320 ..Default::default()
321 },
322 ColumnSchema {
323 column_name: "greptime_value".to_string(),
324 datatype: ColumnDataType::Float64 as i32,
325 semantic_type: SemanticType::Field as i32,
326 ..Default::default()
327 },
328 ],
329 rows: vec![
330 Row {
331 values: vec![
332 Value {
333 value_data: Some(ValueData::TimestampMillisecondValue(1000)),
334 },
335 Value {
336 value_data: Some(ValueData::StringValue("h1".to_string())),
337 },
338 Value {
339 value_data: Some(ValueData::F64Value(42.0)),
340 },
341 ],
342 },
343 Row {
344 values: vec![
345 Value {
346 value_data: Some(ValueData::TimestampMillisecondValue(2000)),
347 },
348 Value {
349 value_data: Some(ValueData::StringValue("h2".to_string())),
350 },
351 Value {
352 value_data: Some(ValueData::F64Value(99.0)),
353 },
354 ],
355 },
356 ],
357 };
358
359 let target = ArrowSchema::new(vec![
361 Field::new(
362 "my_ts",
363 DataType::Timestamp(TimeUnit::Millisecond, None),
364 false,
365 ),
366 Field::new("host", DataType::Utf8, true),
367 Field::new("my_value", DataType::Float64, true),
368 ]);
369
370 let batch = rows_to_aligned_record_batch(&rows, &target).unwrap();
371 assert_eq!(batch.schema().as_ref(), &target);
372 assert_eq!(2, batch.num_rows());
373 assert_eq!(3, batch.num_columns());
374
375 let ts = batch
376 .column(0)
377 .as_any()
378 .downcast_ref::<TimestampMillisecondArray>()
379 .unwrap();
380 assert_eq!(ts.value(0), 1000);
381 assert_eq!(ts.value(1), 2000);
382
383 let hosts = batch
384 .column(1)
385 .as_any()
386 .downcast_ref::<StringArray>()
387 .unwrap();
388 assert_eq!(hosts.value(0), "h1");
389 assert_eq!(hosts.value(1), "h2");
390
391 let values = batch
392 .column(2)
393 .as_any()
394 .downcast_ref::<Float64Array>()
395 .unwrap();
396 assert_eq!(values.value(0), 42.0);
397 assert_eq!(values.value(1), 99.0);
398 }
399
400 #[test]
401 fn test_rows_to_aligned_record_batch_fills_nulls() {
402 let rows = Rows {
403 schema: vec![
404 ColumnSchema {
405 column_name: "greptime_timestamp".to_string(),
406 datatype: ColumnDataType::TimestampMillisecond as i32,
407 semantic_type: SemanticType::Timestamp as i32,
408 ..Default::default()
409 },
410 ColumnSchema {
411 column_name: "host".to_string(),
412 datatype: ColumnDataType::String as i32,
413 semantic_type: SemanticType::Tag as i32,
414 ..Default::default()
415 },
416 ColumnSchema {
417 column_name: "instance".to_string(),
418 datatype: ColumnDataType::String as i32,
419 semantic_type: SemanticType::Tag as i32,
420 ..Default::default()
421 },
422 ColumnSchema {
423 column_name: "greptime_value".to_string(),
424 datatype: ColumnDataType::Float64 as i32,
425 semantic_type: SemanticType::Field as i32,
426 ..Default::default()
427 },
428 ],
429 rows: vec![Row {
430 values: vec![
431 Value {
432 value_data: Some(ValueData::TimestampMillisecondValue(1000)),
433 },
434 Value {
435 value_data: Some(ValueData::StringValue("h1".to_string())),
436 },
437 Value {
438 value_data: Some(ValueData::StringValue("i1".to_string())),
439 },
440 Value {
441 value_data: Some(ValueData::F64Value(1.0)),
442 },
443 ],
444 }],
445 };
446
447 let target = ArrowSchema::new(vec![
449 Field::new(
450 "my_ts",
451 DataType::Timestamp(TimeUnit::Millisecond, None),
452 false,
453 ),
454 Field::new("host", DataType::Utf8, true),
455 Field::new("region", DataType::Utf8, true),
456 Field::new("my_value", DataType::Float64, true),
457 ]);
458
459 let batch = rows_to_aligned_record_batch(&rows, &target).unwrap();
460 assert_eq!(batch.schema().as_ref(), &target);
461 assert_eq!(1, batch.num_rows());
462 assert_eq!(4, batch.num_columns());
463
464 let region = batch
466 .column(2)
467 .as_any()
468 .downcast_ref::<StringArray>()
469 .unwrap();
470 assert!(region.is_null(0));
471 }
472
473 #[test]
474 fn test_identify_missing_columns_from_proto() {
475 let rows_schema = vec![
476 ColumnSchema {
477 column_name: "greptime_timestamp".to_string(),
478 datatype: ColumnDataType::TimestampMillisecond as i32,
479 semantic_type: SemanticType::Timestamp as i32,
480 ..Default::default()
481 },
482 ColumnSchema {
483 column_name: "host".to_string(),
484 datatype: ColumnDataType::String as i32,
485 semantic_type: SemanticType::Tag as i32,
486 ..Default::default()
487 },
488 ColumnSchema {
489 column_name: "instance".to_string(),
490 datatype: ColumnDataType::String as i32,
491 semantic_type: SemanticType::Tag as i32,
492 ..Default::default()
493 },
494 ColumnSchema {
495 column_name: "greptime_value".to_string(),
496 datatype: ColumnDataType::Float64 as i32,
497 semantic_type: SemanticType::Field as i32,
498 ..Default::default()
499 },
500 ];
501
502 let target = ArrowSchema::new(vec![
503 Field::new(
504 "my_ts",
505 DataType::Timestamp(TimeUnit::Millisecond, None),
506 false,
507 ),
508 Field::new("host", DataType::Utf8, true),
509 Field::new("my_value", DataType::Float64, true),
510 ]);
511
512 let missing = identify_missing_columns_from_proto(&rows_schema, &target).unwrap();
513 assert_eq!(missing, vec!["instance".to_string()]);
514 }
515
516 #[test]
517 fn test_build_prom_create_table_schema_from_proto() {
518 let rows_schema = vec![
519 ColumnSchema {
520 column_name: "greptime_timestamp".to_string(),
521 datatype: ColumnDataType::TimestampMillisecond as i32,
522 semantic_type: SemanticType::Timestamp as i32,
523 ..Default::default()
524 },
525 ColumnSchema {
526 column_name: "job".to_string(),
527 datatype: ColumnDataType::String as i32,
528 semantic_type: SemanticType::Tag as i32,
529 ..Default::default()
530 },
531 ColumnSchema {
532 column_name: "greptime_value".to_string(),
533 datatype: ColumnDataType::Float64 as i32,
534 semantic_type: SemanticType::Field as i32,
535 ..Default::default()
536 },
537 ];
538
539 let schema = build_prom_create_table_schema_from_proto(&rows_schema).unwrap();
540 assert_eq!(3, schema.len());
541
542 assert_eq!("greptime_timestamp", schema[0].column_name);
543 assert_eq!(SemanticType::Timestamp as i32, schema[0].semantic_type);
544 assert_eq!(
545 ColumnDataType::TimestampMillisecond as i32,
546 schema[0].datatype
547 );
548
549 assert_eq!("job", schema[1].column_name);
550 assert_eq!(SemanticType::Tag as i32, schema[1].semantic_type);
551 assert_eq!(ColumnDataType::String as i32, schema[1].datatype);
552
553 assert_eq!("greptime_value", schema[2].column_name);
554 assert_eq!(SemanticType::Field as i32, schema[2].semantic_type);
555 assert_eq!(ColumnDataType::Float64 as i32, schema[2].datatype);
556 }
557}