servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42
43pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
44
45/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
46pub const FIELD_NAME_LABEL: &str = "__field__";
47
48/// Metrics for push gateway protocol
49pub struct Metrics {
50    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
51}
52
53/// Get table name from remote query
54pub fn table_name(q: &Query) -> Result<String> {
55    let label_matches = &q.matchers;
56
57    label_matches
58        .iter()
59        .find_map(|m| {
60            if m.name == METRIC_NAME_LABEL {
61                Some(m.value.to_string())
62            } else {
63                None
64            }
65        })
66        .context(error::InvalidPromRemoteRequestSnafu {
67            msg: "missing '__name__' label in timeseries",
68        })
69}
70
71/// Create a DataFrame from a remote Query
72#[tracing::instrument(skip_all)]
73pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
74    let DataFrame::DataFusion(dataframe) = dataframe;
75
76    let start_timestamp_ms = q.start_timestamp_ms;
77    let end_timestamp_ms = q.end_timestamp_ms;
78
79    let label_matches = &q.matchers;
80
81    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
82
83    conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
84    conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
85
86    for m in label_matches {
87        let name = &m.name;
88
89        if name == METRIC_NAME_LABEL {
90            continue;
91        }
92
93        let value = &m.value;
94        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
95            error::InvalidPromRemoteRequestSnafu {
96                msg: format!("invalid LabelMatcher type, decode error: {e}",),
97            }
98            .build()
99        })?;
100
101        match m_type {
102            MatcherType::Eq => {
103                conditions.push(col(name).eq(lit(value)));
104            }
105            MatcherType::Neq => {
106                conditions.push(col(name).not_eq(lit(value)));
107            }
108            // Case sensitive regexp match
109            MatcherType::Re => {
110                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
111            }
112            // Case sensitive regexp not match
113            MatcherType::Nre => {
114                conditions.push(regexp_match(col(name), lit(value), None).is_null());
115            }
116        }
117    }
118
119    // Safety: conditions MUST not be empty, reduce always return Some(expr).
120    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
121
122    let dataframe = dataframe
123        .filter(conditions)
124        .context(error::DataFrameSnafu)?;
125
126    Ok(dataframe.into_parts().1)
127}
128
129#[inline]
130fn new_label(name: String, value: String) -> Label {
131    Label { name, value }
132}
133
134fn lit_timestamp_millisecond(ts: i64) -> Expr {
135    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None))
136}
137
138// A timeseries id
139#[derive(Debug)]
140struct TimeSeriesId {
141    labels: Vec<Label>,
142}
143
144/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
145impl PartialEq for TimeSeriesId {
146    fn eq(&self, other: &Self) -> bool {
147        if self.labels.len() != other.labels.len() {
148            return false;
149        }
150
151        self.labels
152            .iter()
153            .zip(other.labels.iter())
154            .all(|(l, r)| l.name == r.name && l.value == r.value)
155    }
156}
157impl Eq for TimeSeriesId {}
158
159impl Hash for TimeSeriesId {
160    fn hash<H: Hasher>(&self, state: &mut H) {
161        for label in &self.labels {
162            label.name.hash(state);
163            label.value.hash(state);
164        }
165    }
166}
167
168/// For Sorting timeseries
169impl Ord for TimeSeriesId {
170    fn cmp(&self, other: &Self) -> Ordering {
171        let ordering = self.labels.len().cmp(&other.labels.len());
172        if ordering != Ordering::Equal {
173            return ordering;
174        }
175
176        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
177            let ordering = l.name.cmp(&r.name);
178
179            if ordering != Ordering::Equal {
180                return ordering;
181            }
182
183            let ordering = l.value.cmp(&r.value);
184
185            if ordering != Ordering::Equal {
186                return ordering;
187            }
188        }
189        Ordering::Equal
190    }
191}
192
193impl PartialOrd for TimeSeriesId {
194    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
195        Some(self.cmp(other))
196    }
197}
198
199/// Collect each row's timeseries id
200/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
201fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
202    let row_count = recordbatch.num_rows();
203    let mut timeseries_ids = Vec::with_capacity(row_count);
204
205    for row in 0..row_count {
206        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
207        labels.push(new_label(
208            METRIC_NAME_LABEL.to_string(),
209            table_name.to_string(),
210        ));
211
212        for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
213            if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
214                continue;
215            }
216
217            let column = &recordbatch.columns()[i];
218            // A label with an empty label value is considered equivalent to a label that does not exist.
219            if column.is_null(row) {
220                continue;
221            }
222
223            let value = column.get(row).to_string();
224            labels.push(new_label(column_schema.name.clone(), value));
225        }
226        timeseries_ids.push(TimeSeriesId { labels });
227    }
228    timeseries_ids
229}
230
231pub fn recordbatches_to_timeseries(
232    table_name: &str,
233    recordbatches: RecordBatches,
234) -> Result<Vec<TimeSeries>> {
235    Ok(recordbatches
236        .take()
237        .into_iter()
238        .map(|x| recordbatch_to_timeseries(table_name, x))
239        .collect::<Result<Vec<_>>>()?
240        .into_iter()
241        .flatten()
242        .collect())
243}
244
245fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
246    let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
247        error::InvalidPromRemoteReadQueryResultSnafu {
248            msg: "missing greptime_timestamp column in query result",
249        },
250    )?;
251    ensure!(
252        ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
253        error::InvalidPromRemoteReadQueryResultSnafu {
254            msg: format!(
255                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
256                ts_column.data_type()
257            )
258        }
259    );
260
261    let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
262        error::InvalidPromRemoteReadQueryResultSnafu {
263            msg: "missing greptime_value column in query result",
264        },
265    )?;
266    ensure!(
267        field_column.data_type() == ConcreteDataType::float64_datatype(),
268        error::InvalidPromRemoteReadQueryResultSnafu {
269            msg: format!(
270                "Expect value column of datatype Float64, actual {:?}",
271                field_column.data_type()
272            )
273        }
274    );
275
276    // First, collect each row's timeseries id
277    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
278    // Then, group timeseries by it's id.
279    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
280
281    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
282        let timeseries = timeseries_map
283            .entry(timeseries_id)
284            .or_insert_with(|| TimeSeries {
285                labels: timeseries_id.labels.clone(),
286                ..Default::default()
287            });
288
289        if ts_column.is_null(row) || field_column.is_null(row) {
290            continue;
291        }
292
293        let value: f64 = match field_column.get(row) {
294            Value::Float64(value) => value.into(),
295            _ => unreachable!("checked by the \"ensure\" above"),
296        };
297        let timestamp = match ts_column.get(row) {
298            Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
299            _ => unreachable!("checked by the \"ensure\" above"),
300        };
301        let sample = Sample { value, timestamp };
302
303        timeseries.samples.push(sample);
304    }
305
306    Ok(timeseries_map.into_values().collect())
307}
308
309pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
310    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
311
312    let mut multi_table_data = MultiTableData::new();
313
314    for series in &request.timeseries {
315        let table_name = &series
316            .labels
317            .iter()
318            .find(|label| {
319                // The metric name is a special label
320                label.name == METRIC_NAME_LABEL
321            })
322            .context(error::InvalidPromRemoteRequestSnafu {
323                msg: "missing '__name__' label in time-series",
324            })?
325            .value;
326
327        // The metric name is a special label,
328        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
329        let num_columns = series.labels.len() + 1;
330
331        let table_data = multi_table_data.get_or_default_table_data(
332            table_name,
333            num_columns,
334            series.samples.len(),
335        );
336
337        // labels
338        let kvs = series.labels.iter().filter_map(|label| {
339            if label.name == METRIC_NAME_LABEL {
340                None
341            } else {
342                Some((label.name.clone(), label.value.clone()))
343            }
344        });
345
346        if series.samples.len() == 1 {
347            let mut one_row = table_data.alloc_one_row();
348
349            row_writer::write_tags(table_data, kvs, &mut one_row)?;
350            // value
351            row_writer::write_f64(
352                table_data,
353                GREPTIME_VALUE,
354                series.samples[0].value,
355                &mut one_row,
356            )?;
357            // timestamp
358            row_writer::write_ts_to_millis(
359                table_data,
360                GREPTIME_TIMESTAMP,
361                Some(series.samples[0].timestamp),
362                Precision::Millisecond,
363                &mut one_row,
364            )?;
365
366            table_data.add_row(one_row);
367        } else {
368            for Sample { value, timestamp } in &series.samples {
369                let mut one_row = table_data.alloc_one_row();
370
371                // labels
372                let kvs = kvs.clone();
373                row_writer::write_tags(table_data, kvs, &mut one_row)?;
374                // value
375                row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
376                // timestamp
377                row_writer::write_ts_to_millis(
378                    table_data,
379                    GREPTIME_TIMESTAMP,
380                    Some(*timestamp),
381                    Precision::Millisecond,
382                    &mut one_row,
383                )?;
384
385                table_data.add_row(one_row);
386            }
387        }
388    }
389
390    Ok(multi_table_data.into_row_insert_requests())
391}
392
393#[inline]
394pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
395    let mut decoder = Decoder::new();
396    decoder
397        .decompress_vec(buf)
398        .context(error::DecompressSnappyPromRemoteRequestSnafu)
399}
400
401#[inline]
402pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
403    let mut encoder = Encoder::new();
404    encoder
405        .compress_vec(buf)
406        .context(error::CompressPromRemoteRequestSnafu)
407}
408
409#[inline]
410pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
411    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
412}
413
414/// Mock timeseries for test, it is both used in servers and frontend crate
415/// So we present it here
416pub fn mock_timeseries() -> Vec<TimeSeries> {
417    vec![
418        TimeSeries {
419            labels: vec![
420                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
421                new_label("job".to_string(), "spark".to_string()),
422            ],
423            samples: vec![
424                Sample {
425                    value: 1.0f64,
426                    timestamp: 1000,
427                },
428                Sample {
429                    value: 2.0f64,
430                    timestamp: 2000,
431                },
432            ],
433            ..Default::default()
434        },
435        TimeSeries {
436            labels: vec![
437                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
438                new_label("instance".to_string(), "test_host1".to_string()),
439                new_label("idc".to_string(), "z001".to_string()),
440            ],
441            samples: vec![
442                Sample {
443                    value: 3.0f64,
444                    timestamp: 1000,
445                },
446                Sample {
447                    value: 4.0f64,
448                    timestamp: 2000,
449                },
450            ],
451            ..Default::default()
452        },
453        TimeSeries {
454            labels: vec![
455                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
456                new_label("idc".to_string(), "z002".to_string()),
457                new_label("app".to_string(), "biz".to_string()),
458            ],
459            samples: vec![
460                Sample {
461                    value: 5.0f64,
462                    timestamp: 1000,
463                },
464                Sample {
465                    value: 6.0f64,
466                    timestamp: 2000,
467                },
468                Sample {
469                    value: 7.0f64,
470                    timestamp: 3000,
471                },
472            ],
473            ..Default::default()
474        },
475    ]
476}
477
478/// Add new labels to the mock timeseries.
479pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
480    let ts_demo_metrics = TimeSeries {
481        labels: vec![
482            new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
483            new_label("idc".to_string(), "idc3".to_string()),
484            new_label("new_label1".to_string(), "foo".to_string()),
485        ],
486        samples: vec![Sample {
487            value: 42.0,
488            timestamp: 3000,
489        }],
490        ..Default::default()
491    };
492    let ts_multi_labels = TimeSeries {
493        labels: vec![
494            new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
495            new_label("idc".to_string(), "idc4".to_string()),
496            new_label("env".to_string(), "prod".to_string()),
497            new_label("host".to_string(), "host9".to_string()),
498            new_label("new_label2".to_string(), "bar".to_string()),
499        ],
500        samples: vec![Sample {
501            value: 99.0,
502            timestamp: 4000,
503        }],
504        ..Default::default()
505    };
506
507    vec![ts_demo_metrics, ts_multi_labels]
508}
509
510#[cfg(test)]
511mod tests {
512    use std::sync::Arc;
513
514    use api::prom_store::remote::LabelMatcher;
515    use api::v1::{ColumnDataType, Row, SemanticType};
516    use datafusion::prelude::SessionContext;
517    use datatypes::schema::{ColumnSchema, Schema};
518    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
519    use table::table::adapter::DfTableProviderAdapter;
520    use table::test_util::MemTable;
521
522    use super::*;
523
524    const EQ_TYPE: i32 = MatcherType::Eq as i32;
525    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
526    const RE_TYPE: i32 = MatcherType::Re as i32;
527
528    #[test]
529    fn test_table_name() {
530        let q = Query {
531            start_timestamp_ms: 1000,
532            end_timestamp_ms: 2000,
533            matchers: vec![],
534            ..Default::default()
535        };
536        let err = table_name(&q).unwrap_err();
537        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
538
539        let q = Query {
540            start_timestamp_ms: 1000,
541            end_timestamp_ms: 2000,
542            matchers: vec![LabelMatcher {
543                name: METRIC_NAME_LABEL.to_string(),
544                value: "test".to_string(),
545                r#type: EQ_TYPE,
546            }],
547            ..Default::default()
548        };
549        assert_eq!("test", table_name(&q).unwrap());
550    }
551
552    #[test]
553    fn test_query_to_plan() {
554        let q = Query {
555            start_timestamp_ms: 1000,
556            end_timestamp_ms: 2000,
557            matchers: vec![LabelMatcher {
558                name: METRIC_NAME_LABEL.to_string(),
559                value: "test".to_string(),
560                r#type: EQ_TYPE,
561            }],
562            ..Default::default()
563        };
564
565        let schema = Arc::new(Schema::new(vec![
566            ColumnSchema::new(
567                GREPTIME_TIMESTAMP,
568                ConcreteDataType::timestamp_millisecond_datatype(),
569                true,
570            ),
571            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
572            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
573            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
574        ]));
575        let recordbatch = RecordBatch::new(
576            schema,
577            vec![
578                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
579                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
580                Arc::new(StringVector::from(vec!["host1"])) as _,
581                Arc::new(StringVector::from(vec!["job"])) as _,
582            ],
583        )
584        .unwrap();
585
586        let ctx = SessionContext::new();
587        let table = MemTable::table("test", recordbatch);
588        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
589
590        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
591        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
592        let display_string = format!("{}", plan.display_indent());
593
594        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n  TableScan: ?table?", display_string);
595
596        let q = Query {
597            start_timestamp_ms: 1000,
598            end_timestamp_ms: 2000,
599            matchers: vec![
600                LabelMatcher {
601                    name: METRIC_NAME_LABEL.to_string(),
602                    value: "test".to_string(),
603                    r#type: EQ_TYPE,
604                },
605                LabelMatcher {
606                    name: "job".to_string(),
607                    value: "*prom*".to_string(),
608                    r#type: RE_TYPE,
609                },
610                LabelMatcher {
611                    name: "instance".to_string(),
612                    value: "localhost".to_string(),
613                    r#type: NEQ_TYPE,
614                },
615            ],
616            ..Default::default()
617        };
618
619        let dataframe = ctx.read_table(table_provider).unwrap();
620        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
621        let display_string = format!("{}", plan.display_indent());
622
623        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?", display_string);
624    }
625
626    fn column_schemas_with(
627        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
628    ) -> Vec<api::v1::ColumnSchema> {
629        kts_iter.push((
630            "greptime_value",
631            ColumnDataType::Float64,
632            SemanticType::Field,
633        ));
634        kts_iter.push((
635            "greptime_timestamp",
636            ColumnDataType::TimestampMillisecond,
637            SemanticType::Timestamp,
638        ));
639
640        kts_iter
641            .into_iter()
642            .map(|(k, t, s)| api::v1::ColumnSchema {
643                column_name: k.to_string(),
644                datatype: t as i32,
645                semantic_type: s as i32,
646                ..Default::default()
647            })
648            .collect()
649    }
650
651    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
652        Row {
653            values: vec![
654                api::v1::Value {
655                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
656                },
657                api::v1::Value {
658                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
659                },
660                api::v1::Value {
661                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
662                        timestamp,
663                    )),
664                },
665            ],
666        }
667    }
668
669    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
670        Row {
671            values: vec![
672                api::v1::Value {
673                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
674                },
675                api::v1::Value {
676                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
677                },
678                api::v1::Value {
679                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
680                },
681                api::v1::Value {
682                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
683                        timestamp,
684                    )),
685                },
686            ],
687        }
688    }
689
690    #[test]
691    fn test_write_request_to_row_insert_exprs() {
692        let write_request = WriteRequest {
693            timeseries: mock_timeseries(),
694            ..Default::default()
695        };
696
697        let mut exprs = to_grpc_row_insert_requests(&write_request)
698            .unwrap()
699            .0
700            .inserts;
701        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
702        assert_eq!(3, exprs.len());
703        assert_eq!("metric1", exprs[0].table_name);
704        assert_eq!("metric2", exprs[1].table_name);
705        assert_eq!("metric3", exprs[2].table_name);
706
707        let rows = exprs[0].rows.as_ref().unwrap();
708        let schema = &rows.schema;
709        let rows = &rows.rows;
710        assert_eq!(2, rows.len());
711        assert_eq!(3, schema.len());
712        assert_eq!(
713            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
714            *schema
715        );
716        assert_eq!(
717            &vec![
718                make_row_with_label("spark", 1.0, 1000),
719                make_row_with_label("spark", 2.0, 2000),
720            ],
721            rows
722        );
723
724        let rows = exprs[1].rows.as_ref().unwrap();
725        let schema = &rows.schema;
726        let rows = &rows.rows;
727        assert_eq!(2, rows.len());
728        assert_eq!(4, schema.len());
729        assert_eq!(
730            column_schemas_with(vec![
731                ("instance", ColumnDataType::String, SemanticType::Tag),
732                ("idc", ColumnDataType::String, SemanticType::Tag)
733            ]),
734            *schema
735        );
736        assert_eq!(
737            &vec![
738                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
739                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
740            ],
741            rows
742        );
743
744        let rows = exprs[2].rows.as_ref().unwrap();
745        let schema = &rows.schema;
746        let rows = &rows.rows;
747        assert_eq!(3, rows.len());
748        assert_eq!(4, schema.len());
749        assert_eq!(
750            column_schemas_with(vec![
751                ("idc", ColumnDataType::String, SemanticType::Tag),
752                ("app", ColumnDataType::String, SemanticType::Tag)
753            ]),
754            *schema
755        );
756        assert_eq!(
757            &vec![
758                make_row_with_2_labels("z002", "biz", 5.0, 1000),
759                make_row_with_2_labels("z002", "biz", 6.0, 2000),
760                make_row_with_2_labels("z002", "biz", 7.0, 3000),
761            ],
762            rows
763        );
764    }
765
766    #[test]
767    fn test_recordbatches_to_timeseries() {
768        let schema = Arc::new(Schema::new(vec![
769            ColumnSchema::new(
770                GREPTIME_TIMESTAMP,
771                ConcreteDataType::timestamp_millisecond_datatype(),
772                true,
773            ),
774            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
775            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
776        ]));
777
778        let recordbatches = RecordBatches::try_new(
779            schema.clone(),
780            vec![
781                RecordBatch::new(
782                    schema.clone(),
783                    vec![
784                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
785                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
786                        Arc::new(StringVector::from(vec!["host1"])) as _,
787                    ],
788                )
789                .unwrap(),
790                RecordBatch::new(
791                    schema,
792                    vec![
793                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
794                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
795                        Arc::new(StringVector::from(vec!["host2"])) as _,
796                    ],
797                )
798                .unwrap(),
799            ],
800        )
801        .unwrap();
802
803        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
804        assert_eq!(2, timeseries.len());
805
806        assert_eq!(
807            vec![
808                Label {
809                    name: METRIC_NAME_LABEL.to_string(),
810                    value: "metric1".to_string(),
811                },
812                Label {
813                    name: "instance".to_string(),
814                    value: "host1".to_string(),
815                },
816            ],
817            timeseries[0].labels
818        );
819
820        assert_eq!(
821            timeseries[0].samples,
822            vec![Sample {
823                value: 3.0,
824                timestamp: 1000,
825            }]
826        );
827
828        assert_eq!(
829            vec![
830                Label {
831                    name: METRIC_NAME_LABEL.to_string(),
832                    value: "metric1".to_string(),
833                },
834                Label {
835                    name: "instance".to_string(),
836                    value: "host2".to_string(),
837                },
838            ],
839            timeseries[1].labels
840        );
841        assert_eq!(
842            timeseries[1].samples,
843            vec![Sample {
844                value: 7.0,
845                timestamp: 2000,
846            }]
847        );
848    }
849}