Skip to main content

servers/
prom_row_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prometheus row-level helpers for converting proto `Rows` into Arrow
16//! `RecordBatch`es and aligning / normalizing their schemas against
17//! existing table schemas in the catalog.
18
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21
22use api::helper::ColumnDataTypeWrapper;
23use api::v1::value::ValueData;
24use api::v1::{ColumnSchema, Rows, SemanticType};
25use arrow::array::{
26    ArrayRef, Float64Builder, StringBuilder, TimestampMicrosecondBuilder,
27    TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder,
28    new_null_array,
29};
30use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema};
31use arrow::record_batch::RecordBatch;
32use arrow_schema::TimeUnit;
33use common_query::prelude::{greptime_timestamp, greptime_value};
34use datatypes::data_type::DataType;
35use datatypes::prelude::ConcreteDataType;
36use snafu::{OptionExt, ResultExt, ensure};
37
38use crate::error;
39use crate::error::Result;
40
41/// Extract timestamp, field, and tag column names from a logical region schema.
42fn unzip_logical_region_schema(
43    target_schema: &ArrowSchema,
44) -> Result<(String, String, HashSet<String>)> {
45    let mut timestamp_column = None;
46    let mut field_column = None;
47    let mut tag_columns = HashSet::with_capacity(target_schema.fields.len().saturating_sub(2));
48    for field in target_schema.fields() {
49        if field.name() == greptime_timestamp() {
50            timestamp_column = Some(field.name().clone());
51            continue;
52        }
53
54        if field.name() == greptime_value() {
55            field_column = Some(field.name().clone());
56            continue;
57        }
58
59        if timestamp_column.is_none() && matches!(field.data_type(), ArrowDataType::Timestamp(_, _))
60        {
61            timestamp_column = Some(field.name().clone());
62            continue;
63        }
64
65        if field_column.is_none() && matches!(field.data_type(), ArrowDataType::Float64) {
66            field_column = Some(field.name().clone());
67            continue;
68        }
69        tag_columns.insert(field.name().clone());
70    }
71
72    let timestamp_column = timestamp_column.with_context(|| error::UnexpectedResultSnafu {
73        reason: "Failed to locate timestamp column in target schema".to_string(),
74    })?;
75    let field_column = field_column.with_context(|| error::UnexpectedResultSnafu {
76        reason: "Failed to locate field column in target schema".to_string(),
77    })?;
78
79    Ok((timestamp_column, field_column, tag_columns))
80}
81
82/// Directly converts proto `Rows` into a `RecordBatch` aligned to the given
83/// `target_schema`, handling Prometheus column renaming (timestamp/value),
84/// reordering, type casting, and null-filling in a single pass.
85pub(crate) fn rows_to_aligned_record_batch(
86    rows: &Rows,
87    target_schema: &ArrowSchema,
88) -> Result<RecordBatch> {
89    let row_count = rows.rows.len();
90    let column_count = rows.schema.len();
91
92    for (idx, row) in rows.rows.iter().enumerate() {
93        ensure!(
94            row.values.len() == column_count,
95            error::InternalSnafu {
96                err_msg: format!(
97                    "Column count mismatch in row {}, expected {}, got {}",
98                    idx,
99                    column_count,
100                    row.values.len()
101                )
102            }
103        );
104    }
105
106    let (target_ts_name, target_field_name, _target_tags) =
107        unzip_logical_region_schema(target_schema)?;
108
109    // Map effective target column name → (source column index, source arrow type).
110    // Handles prom renames: Timestamp → target ts name, Float64 → target field name.
111    let mut source_map: HashMap<&str, (usize, ArrowDataType)> =
112        HashMap::with_capacity(rows.schema.len());
113
114    for (src_idx, col) in rows.schema.iter().enumerate() {
115        let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?;
116        let src_arrow_type = ConcreteDataType::from(wrapper).as_arrow_type();
117
118        match &src_arrow_type {
119            ArrowDataType::Float64 => {
120                source_map.insert(&target_field_name, (src_idx, src_arrow_type));
121            }
122            ArrowDataType::Timestamp(unit, _) => {
123                ensure!(
124                    unit == &TimeUnit::Millisecond,
125                    error::InvalidPromRemoteRequestSnafu {
126                        msg: format!(
127                            "Unexpected remote write batch timestamp unit, expect millisecond, got: {}",
128                            unit
129                        )
130                    }
131                );
132                source_map.insert(&target_ts_name, (src_idx, src_arrow_type));
133            }
134            ArrowDataType::Utf8 => {
135                source_map.insert(&col.column_name, (src_idx, src_arrow_type));
136            }
137            other => {
138                return error::InvalidPromRemoteRequestSnafu {
139                    msg: format!(
140                        "Unexpected remote write batch field type {}, field name: {}",
141                        other, col.column_name
142                    ),
143                }
144                .fail();
145            }
146        }
147    }
148
149    // Build columns in target schema order
150    let mut columns = Vec::with_capacity(target_schema.fields().len());
151    for target_field in target_schema.fields() {
152        if let Some((src_idx, src_arrow_type)) = source_map.get(target_field.name().as_str()) {
153            let array = build_arrow_array(
154                rows,
155                *src_idx,
156                &rows.schema[*src_idx].column_name,
157                src_arrow_type.clone(),
158                row_count,
159            )?;
160            columns.push(array);
161        } else {
162            columns.push(new_null_array(target_field.data_type(), row_count));
163        }
164    }
165
166    let batch = RecordBatch::try_new(Arc::new(target_schema.clone()), columns)
167        .context(error::ArrowSnafu)?;
168    Ok(batch)
169}
170
171/// Identify tag columns in the proto `rows_schema` that are absent from the
172/// target region schema, without building an intermediate `RecordBatch`.
173pub(crate) fn identify_missing_columns_from_proto(
174    rows_schema: &[ColumnSchema],
175    target_schema: &ArrowSchema,
176) -> Result<Vec<String>> {
177    let (_, _, target_tags) = unzip_logical_region_schema(target_schema)?;
178    let mut missing = Vec::new();
179    for col in rows_schema {
180        let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?;
181        let arrow_type = ConcreteDataType::from(wrapper).as_arrow_type();
182        if matches!(arrow_type, ArrowDataType::Utf8)
183            && !target_tags.contains(&col.column_name)
184            && target_schema.column_with_name(&col.column_name).is_none()
185        {
186            missing.push(col.column_name.clone());
187        }
188    }
189    Ok(missing)
190}
191
192/// Build a `Vec<ColumnSchema>` suitable for creating a new Prometheus logical table
193/// directly from the proto `rows.schema`, avoiding the round-trip through Arrow schema.
194pub fn build_prom_create_table_schema_from_proto(
195    rows_schema: &[ColumnSchema],
196) -> Result<Vec<ColumnSchema>> {
197    rows_schema
198        .iter()
199        .map(|col| {
200            let semantic_type = if col.datatype == api::v1::ColumnDataType::TimestampMillisecond as i32 {
201                SemanticType::Timestamp
202            } else if col.datatype == api::v1::ColumnDataType::Float64 as i32 {
203                SemanticType::Field
204            } else {
205                // tag columns must be String type
206                ensure!(col.datatype == api::v1::ColumnDataType::String as i32, error::InvalidPromRemoteRequestSnafu{
207                                        msg: format!(
208                        "Failed to build create table schema, tag column '{}' must be String but got datatype {}",
209                        col.column_name, col.datatype
210                    )
211                });
212                SemanticType::Tag
213            };
214
215            Ok(ColumnSchema {
216                column_name: col.column_name.clone(),
217                datatype: col.datatype,
218                semantic_type: semantic_type as i32,
219                datatype_extension: col.datatype_extension.clone(),
220                options: None,
221            })
222        })
223        .collect()
224}
225
226/// Build a single Arrow array for the given column index from proto `Rows`.
227fn build_arrow_array(
228    rows: &Rows,
229    col_idx: usize,
230    column_name: &String,
231    column_data_type: arrow::datatypes::DataType,
232    row_count: usize,
233) -> Result<ArrayRef> {
234    macro_rules! build_array {
235        ($builder:expr, $( $pattern:pat => $value:expr ),+ $(,)?) => {{
236            let mut builder = $builder;
237            for row in &rows.rows {
238                match row.values[col_idx].value_data.as_ref() {
239                    $(Some($pattern) => builder.append_value($value),)+
240                    Some(v) => {
241                        return error::InvalidPromRemoteRequestSnafu {
242                            msg: format!("Unexpected value: {:?}", v),
243                        }
244                        .fail();
245                    }
246                    None => builder.append_null(),
247                }
248            }
249            Arc::new(builder.finish()) as ArrayRef
250        }};
251    }
252
253    let array: ArrayRef = match column_data_type {
254        arrow::datatypes::DataType::Float64 => {
255            build_array!(Float64Builder::with_capacity(row_count), ValueData::F64Value(v) => *v)
256        }
257        arrow::datatypes::DataType::Utf8 => build_array!(
258            StringBuilder::with_capacity(row_count, 0),
259            ValueData::StringValue(v) => v
260        ),
261        arrow::datatypes::DataType::Timestamp(u, _) => match u {
262            TimeUnit::Second => build_array!(
263                TimestampSecondBuilder::with_capacity(row_count),
264                ValueData::TimestampSecondValue(v) => *v
265            ),
266            TimeUnit::Millisecond => build_array!(
267                TimestampMillisecondBuilder::with_capacity(row_count),
268                ValueData::TimestampMillisecondValue(v) => *v
269            ),
270            TimeUnit::Microsecond => build_array!(
271                TimestampMicrosecondBuilder::with_capacity(row_count),
272                ValueData::DatetimeValue(v) => *v,
273                ValueData::TimestampMicrosecondValue(v) => *v
274            ),
275            TimeUnit::Nanosecond => build_array!(
276                TimestampNanosecondBuilder::with_capacity(row_count),
277                ValueData::TimestampNanosecondValue(v) => *v
278            ),
279        },
280        ty => {
281            return error::InvalidPromRemoteRequestSnafu {
282                msg: format!(
283                    "Unexpected column type {:?}, column name: {}",
284                    ty, column_name
285                ),
286            }
287            .fail();
288        }
289    };
290
291    Ok(array)
292}
293
294#[cfg(test)]
295mod tests {
296    use api::v1::value::ValueData;
297    use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
298    use arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
299    use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
300
301    use super::{
302        build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
303        rows_to_aligned_record_batch,
304    };
305
306    #[test]
307    fn test_rows_to_aligned_record_batch_renames_and_reorders() {
308        let rows = Rows {
309            schema: vec![
310                ColumnSchema {
311                    column_name: "greptime_timestamp".to_string(),
312                    datatype: ColumnDataType::TimestampMillisecond as i32,
313                    semantic_type: SemanticType::Timestamp as i32,
314                    ..Default::default()
315                },
316                ColumnSchema {
317                    column_name: "host".to_string(),
318                    datatype: ColumnDataType::String as i32,
319                    semantic_type: SemanticType::Tag as i32,
320                    ..Default::default()
321                },
322                ColumnSchema {
323                    column_name: "greptime_value".to_string(),
324                    datatype: ColumnDataType::Float64 as i32,
325                    semantic_type: SemanticType::Field as i32,
326                    ..Default::default()
327                },
328            ],
329            rows: vec![
330                Row {
331                    values: vec![
332                        Value {
333                            value_data: Some(ValueData::TimestampMillisecondValue(1000)),
334                        },
335                        Value {
336                            value_data: Some(ValueData::StringValue("h1".to_string())),
337                        },
338                        Value {
339                            value_data: Some(ValueData::F64Value(42.0)),
340                        },
341                    ],
342                },
343                Row {
344                    values: vec![
345                        Value {
346                            value_data: Some(ValueData::TimestampMillisecondValue(2000)),
347                        },
348                        Value {
349                            value_data: Some(ValueData::StringValue("h2".to_string())),
350                        },
351                        Value {
352                            value_data: Some(ValueData::F64Value(99.0)),
353                        },
354                    ],
355                },
356            ],
357        };
358
359        // Target schema has renamed columns and different ordering.
360        let target = ArrowSchema::new(vec![
361            Field::new(
362                "my_ts",
363                DataType::Timestamp(TimeUnit::Millisecond, None),
364                false,
365            ),
366            Field::new("host", DataType::Utf8, true),
367            Field::new("my_value", DataType::Float64, true),
368        ]);
369
370        let batch = rows_to_aligned_record_batch(&rows, &target).unwrap();
371        assert_eq!(batch.schema().as_ref(), &target);
372        assert_eq!(2, batch.num_rows());
373        assert_eq!(3, batch.num_columns());
374
375        let ts = batch
376            .column(0)
377            .as_any()
378            .downcast_ref::<TimestampMillisecondArray>()
379            .unwrap();
380        assert_eq!(ts.value(0), 1000);
381        assert_eq!(ts.value(1), 2000);
382
383        let hosts = batch
384            .column(1)
385            .as_any()
386            .downcast_ref::<StringArray>()
387            .unwrap();
388        assert_eq!(hosts.value(0), "h1");
389        assert_eq!(hosts.value(1), "h2");
390
391        let values = batch
392            .column(2)
393            .as_any()
394            .downcast_ref::<Float64Array>()
395            .unwrap();
396        assert_eq!(values.value(0), 42.0);
397        assert_eq!(values.value(1), 99.0);
398    }
399
400    #[test]
401    fn test_rows_to_aligned_record_batch_fills_nulls() {
402        let rows = Rows {
403            schema: vec![
404                ColumnSchema {
405                    column_name: "greptime_timestamp".to_string(),
406                    datatype: ColumnDataType::TimestampMillisecond as i32,
407                    semantic_type: SemanticType::Timestamp as i32,
408                    ..Default::default()
409                },
410                ColumnSchema {
411                    column_name: "host".to_string(),
412                    datatype: ColumnDataType::String as i32,
413                    semantic_type: SemanticType::Tag as i32,
414                    ..Default::default()
415                },
416                ColumnSchema {
417                    column_name: "instance".to_string(),
418                    datatype: ColumnDataType::String as i32,
419                    semantic_type: SemanticType::Tag as i32,
420                    ..Default::default()
421                },
422                ColumnSchema {
423                    column_name: "greptime_value".to_string(),
424                    datatype: ColumnDataType::Float64 as i32,
425                    semantic_type: SemanticType::Field as i32,
426                    ..Default::default()
427                },
428            ],
429            rows: vec![Row {
430                values: vec![
431                    Value {
432                        value_data: Some(ValueData::TimestampMillisecondValue(1000)),
433                    },
434                    Value {
435                        value_data: Some(ValueData::StringValue("h1".to_string())),
436                    },
437                    Value {
438                        value_data: Some(ValueData::StringValue("i1".to_string())),
439                    },
440                    Value {
441                        value_data: Some(ValueData::F64Value(1.0)),
442                    },
443                ],
444            }],
445        };
446
447        // Target schema has "host" but not "instance"; also has "region" which is missing from source.
448        let target = ArrowSchema::new(vec![
449            Field::new(
450                "my_ts",
451                DataType::Timestamp(TimeUnit::Millisecond, None),
452                false,
453            ),
454            Field::new("host", DataType::Utf8, true),
455            Field::new("region", DataType::Utf8, true),
456            Field::new("my_value", DataType::Float64, true),
457        ]);
458
459        let batch = rows_to_aligned_record_batch(&rows, &target).unwrap();
460        assert_eq!(batch.schema().as_ref(), &target);
461        assert_eq!(1, batch.num_rows());
462        assert_eq!(4, batch.num_columns());
463
464        // "region" column should be null-filled.
465        let region = batch
466            .column(2)
467            .as_any()
468            .downcast_ref::<StringArray>()
469            .unwrap();
470        assert!(region.is_null(0));
471    }
472
473    #[test]
474    fn test_identify_missing_columns_from_proto() {
475        let rows_schema = vec![
476            ColumnSchema {
477                column_name: "greptime_timestamp".to_string(),
478                datatype: ColumnDataType::TimestampMillisecond as i32,
479                semantic_type: SemanticType::Timestamp as i32,
480                ..Default::default()
481            },
482            ColumnSchema {
483                column_name: "host".to_string(),
484                datatype: ColumnDataType::String as i32,
485                semantic_type: SemanticType::Tag as i32,
486                ..Default::default()
487            },
488            ColumnSchema {
489                column_name: "instance".to_string(),
490                datatype: ColumnDataType::String as i32,
491                semantic_type: SemanticType::Tag as i32,
492                ..Default::default()
493            },
494            ColumnSchema {
495                column_name: "greptime_value".to_string(),
496                datatype: ColumnDataType::Float64 as i32,
497                semantic_type: SemanticType::Field as i32,
498                ..Default::default()
499            },
500        ];
501
502        let target = ArrowSchema::new(vec![
503            Field::new(
504                "my_ts",
505                DataType::Timestamp(TimeUnit::Millisecond, None),
506                false,
507            ),
508            Field::new("host", DataType::Utf8, true),
509            Field::new("my_value", DataType::Float64, true),
510        ]);
511
512        let missing = identify_missing_columns_from_proto(&rows_schema, &target).unwrap();
513        assert_eq!(missing, vec!["instance".to_string()]);
514    }
515
516    #[test]
517    fn test_build_prom_create_table_schema_from_proto() {
518        let rows_schema = vec![
519            ColumnSchema {
520                column_name: "greptime_timestamp".to_string(),
521                datatype: ColumnDataType::TimestampMillisecond as i32,
522                semantic_type: SemanticType::Timestamp as i32,
523                ..Default::default()
524            },
525            ColumnSchema {
526                column_name: "job".to_string(),
527                datatype: ColumnDataType::String as i32,
528                semantic_type: SemanticType::Tag as i32,
529                ..Default::default()
530            },
531            ColumnSchema {
532                column_name: "greptime_value".to_string(),
533                datatype: ColumnDataType::Float64 as i32,
534                semantic_type: SemanticType::Field as i32,
535                ..Default::default()
536            },
537        ];
538
539        let schema = build_prom_create_table_schema_from_proto(&rows_schema).unwrap();
540        assert_eq!(3, schema.len());
541
542        assert_eq!("greptime_timestamp", schema[0].column_name);
543        assert_eq!(SemanticType::Timestamp as i32, schema[0].semantic_type);
544        assert_eq!(
545            ColumnDataType::TimestampMillisecond as i32,
546            schema[0].datatype
547        );
548
549        assert_eq!("job", schema[1].column_name);
550        assert_eq!(SemanticType::Tag as i32, schema[1].semantic_type);
551        assert_eq!(ColumnDataType::String as i32, schema[1].datatype);
552
553        assert_eq!("greptime_value", schema[2].column_name);
554        assert_eq!(SemanticType::Field as i32, schema[2].semantic_type);
555        assert_eq!(ColumnDataType::Float64 as i32, schema[2].datatype);
556    }
557}