servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42
43pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
44
45/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
46pub const FIELD_NAME_LABEL: &str = "__field__";
47
48/// Metrics for push gateway protocol
49pub struct Metrics {
50    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
51}
52
53/// Get table name from remote query
54pub fn table_name(q: &Query) -> Result<String> {
55    let label_matches = &q.matchers;
56
57    label_matches
58        .iter()
59        .find_map(|m| {
60            if m.name == METRIC_NAME_LABEL {
61                Some(m.value.to_string())
62            } else {
63                None
64            }
65        })
66        .context(error::InvalidPromRemoteRequestSnafu {
67            msg: "missing '__name__' label in timeseries",
68        })
69}
70
71/// Create a DataFrame from a remote Query
72#[tracing::instrument(skip_all)]
73pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
74    let DataFrame::DataFusion(dataframe) = dataframe;
75
76    let start_timestamp_ms = q.start_timestamp_ms;
77    let end_timestamp_ms = q.end_timestamp_ms;
78
79    let label_matches = &q.matchers;
80
81    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
82
83    conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
84    conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
85
86    for m in label_matches {
87        let name = &m.name;
88
89        if name == METRIC_NAME_LABEL {
90            continue;
91        }
92
93        let value = &m.value;
94        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
95            error::InvalidPromRemoteRequestSnafu {
96                msg: format!("invalid LabelMatcher type, decode error: {e}",),
97            }
98            .build()
99        })?;
100
101        match m_type {
102            MatcherType::Eq => {
103                conditions.push(col(name).eq(lit(value)));
104            }
105            MatcherType::Neq => {
106                conditions.push(col(name).not_eq(lit(value)));
107            }
108            // Case sensitive regexp match
109            MatcherType::Re => {
110                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
111            }
112            // Case sensitive regexp not match
113            MatcherType::Nre => {
114                conditions.push(regexp_match(col(name), lit(value), None).is_null());
115            }
116        }
117    }
118
119    // Safety: conditions MUST not be empty, reduce always return Some(expr).
120    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
121
122    let dataframe = dataframe
123        .filter(conditions)
124        .context(error::DataFrameSnafu)?;
125
126    Ok(dataframe.into_parts().1)
127}
128
129#[inline]
130fn new_label(name: String, value: String) -> Label {
131    Label { name, value }
132}
133
134fn lit_timestamp_millisecond(ts: i64) -> Expr {
135    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None))
136}
137
138// A timeseries id
139#[derive(Debug)]
140struct TimeSeriesId {
141    labels: Vec<Label>,
142}
143
144/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
145impl PartialEq for TimeSeriesId {
146    fn eq(&self, other: &Self) -> bool {
147        if self.labels.len() != other.labels.len() {
148            return false;
149        }
150
151        self.labels
152            .iter()
153            .zip(other.labels.iter())
154            .all(|(l, r)| l.name == r.name && l.value == r.value)
155    }
156}
157impl Eq for TimeSeriesId {}
158
159impl Hash for TimeSeriesId {
160    fn hash<H: Hasher>(&self, state: &mut H) {
161        for label in &self.labels {
162            label.name.hash(state);
163            label.value.hash(state);
164        }
165    }
166}
167
168/// For Sorting timeseries
169impl Ord for TimeSeriesId {
170    fn cmp(&self, other: &Self) -> Ordering {
171        let ordering = self.labels.len().cmp(&other.labels.len());
172        if ordering != Ordering::Equal {
173            return ordering;
174        }
175
176        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
177            let ordering = l.name.cmp(&r.name);
178
179            if ordering != Ordering::Equal {
180                return ordering;
181            }
182
183            let ordering = l.value.cmp(&r.value);
184
185            if ordering != Ordering::Equal {
186                return ordering;
187            }
188        }
189        Ordering::Equal
190    }
191}
192
193impl PartialOrd for TimeSeriesId {
194    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
195        Some(self.cmp(other))
196    }
197}
198
199/// Collect each row's timeseries id
200/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
201fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
202    let row_count = recordbatch.num_rows();
203    let mut timeseries_ids = Vec::with_capacity(row_count);
204
205    for row in 0..row_count {
206        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
207        labels.push(new_label(
208            METRIC_NAME_LABEL.to_string(),
209            table_name.to_string(),
210        ));
211
212        for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
213            if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
214                continue;
215            }
216
217            let column = &recordbatch.columns()[i];
218            // A label with an empty label value is considered equivalent to a label that does not exist.
219            if column.is_null(row) {
220                continue;
221            }
222
223            let value = column.get(row).to_string();
224            labels.push(new_label(column_schema.name.clone(), value));
225        }
226        timeseries_ids.push(TimeSeriesId { labels });
227    }
228    timeseries_ids
229}
230
231pub fn recordbatches_to_timeseries(
232    table_name: &str,
233    recordbatches: RecordBatches,
234) -> Result<Vec<TimeSeries>> {
235    Ok(recordbatches
236        .take()
237        .into_iter()
238        .map(|x| recordbatch_to_timeseries(table_name, x))
239        .collect::<Result<Vec<_>>>()?
240        .into_iter()
241        .flatten()
242        .collect())
243}
244
245fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
246    let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
247        error::InvalidPromRemoteReadQueryResultSnafu {
248            msg: "missing greptime_timestamp column in query result",
249        },
250    )?;
251    ensure!(
252        ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
253        error::InvalidPromRemoteReadQueryResultSnafu {
254            msg: format!(
255                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
256                ts_column.data_type()
257            )
258        }
259    );
260
261    let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
262        error::InvalidPromRemoteReadQueryResultSnafu {
263            msg: "missing greptime_value column in query result",
264        },
265    )?;
266    ensure!(
267        field_column.data_type() == ConcreteDataType::float64_datatype(),
268        error::InvalidPromRemoteReadQueryResultSnafu {
269            msg: format!(
270                "Expect value column of datatype Float64, actual {:?}",
271                field_column.data_type()
272            )
273        }
274    );
275
276    // First, collect each row's timeseries id
277    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
278    // Then, group timeseries by it's id.
279    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
280
281    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
282        let timeseries = timeseries_map
283            .entry(timeseries_id)
284            .or_insert_with(|| TimeSeries {
285                labels: timeseries_id.labels.clone(),
286                ..Default::default()
287            });
288
289        if ts_column.is_null(row) || field_column.is_null(row) {
290            continue;
291        }
292
293        let value: f64 = match field_column.get(row) {
294            Value::Float64(value) => value.into(),
295            _ => unreachable!("checked by the \"ensure\" above"),
296        };
297        let timestamp = match ts_column.get(row) {
298            Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
299            _ => unreachable!("checked by the \"ensure\" above"),
300        };
301        let sample = Sample { value, timestamp };
302
303        timeseries.samples.push(sample);
304    }
305
306    Ok(timeseries_map.into_values().collect())
307}
308
309pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
310    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
311
312    let mut multi_table_data = MultiTableData::new();
313
314    for series in &request.timeseries {
315        let table_name = &series
316            .labels
317            .iter()
318            .find(|label| {
319                // The metric name is a special label
320                label.name == METRIC_NAME_LABEL
321            })
322            .context(error::InvalidPromRemoteRequestSnafu {
323                msg: "missing '__name__' label in time-series",
324            })?
325            .value;
326
327        // The metric name is a special label,
328        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
329        let num_columns = series.labels.len() + 1;
330
331        let table_data = multi_table_data.get_or_default_table_data(
332            table_name,
333            num_columns,
334            series.samples.len(),
335        );
336
337        // labels
338        let kvs = series.labels.iter().filter_map(|label| {
339            if label.name == METRIC_NAME_LABEL {
340                None
341            } else {
342                Some((label.name.clone(), label.value.clone()))
343            }
344        });
345
346        if series.samples.len() == 1 {
347            let mut one_row = table_data.alloc_one_row();
348
349            row_writer::write_tags(table_data, kvs, &mut one_row)?;
350            // value
351            row_writer::write_f64(
352                table_data,
353                GREPTIME_VALUE,
354                series.samples[0].value,
355                &mut one_row,
356            )?;
357            // timestamp
358            row_writer::write_ts_to_millis(
359                table_data,
360                GREPTIME_TIMESTAMP,
361                Some(series.samples[0].timestamp),
362                Precision::Millisecond,
363                &mut one_row,
364            )?;
365
366            table_data.add_row(one_row);
367        } else {
368            for Sample { value, timestamp } in &series.samples {
369                let mut one_row = table_data.alloc_one_row();
370
371                // labels
372                let kvs = kvs.clone();
373                row_writer::write_tags(table_data, kvs, &mut one_row)?;
374                // value
375                row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
376                // timestamp
377                row_writer::write_ts_to_millis(
378                    table_data,
379                    GREPTIME_TIMESTAMP,
380                    Some(*timestamp),
381                    Precision::Millisecond,
382                    &mut one_row,
383                )?;
384
385                table_data.add_row(one_row);
386            }
387        }
388    }
389
390    Ok(multi_table_data.into_row_insert_requests())
391}
392
393#[inline]
394pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
395    let mut decoder = Decoder::new();
396    decoder
397        .decompress_vec(buf)
398        .context(error::DecompressSnappyPromRemoteRequestSnafu)
399}
400
401#[inline]
402pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
403    let mut encoder = Encoder::new();
404    encoder
405        .compress_vec(buf)
406        .context(error::CompressPromRemoteRequestSnafu)
407}
408
409#[inline]
410pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
411    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
412}
413
414/// Mock timeseries for test, it is both used in servers and frontend crate
415/// So we present it here
416pub fn mock_timeseries() -> Vec<TimeSeries> {
417    vec![
418        TimeSeries {
419            labels: vec![
420                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
421                new_label("job".to_string(), "spark".to_string()),
422            ],
423            samples: vec![
424                Sample {
425                    value: 1.0f64,
426                    timestamp: 1000,
427                },
428                Sample {
429                    value: 2.0f64,
430                    timestamp: 2000,
431                },
432            ],
433            ..Default::default()
434        },
435        TimeSeries {
436            labels: vec![
437                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
438                new_label("instance".to_string(), "test_host1".to_string()),
439                new_label("idc".to_string(), "z001".to_string()),
440            ],
441            samples: vec![
442                Sample {
443                    value: 3.0f64,
444                    timestamp: 1000,
445                },
446                Sample {
447                    value: 4.0f64,
448                    timestamp: 2000,
449                },
450            ],
451            ..Default::default()
452        },
453        TimeSeries {
454            labels: vec![
455                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
456                new_label("idc".to_string(), "z002".to_string()),
457                new_label("app".to_string(), "biz".to_string()),
458            ],
459            samples: vec![
460                Sample {
461                    value: 5.0f64,
462                    timestamp: 1000,
463                },
464                Sample {
465                    value: 6.0f64,
466                    timestamp: 2000,
467                },
468                Sample {
469                    value: 7.0f64,
470                    timestamp: 3000,
471                },
472            ],
473            ..Default::default()
474        },
475    ]
476}
477
478#[cfg(test)]
479mod tests {
480    use std::sync::Arc;
481
482    use api::prom_store::remote::LabelMatcher;
483    use api::v1::{ColumnDataType, Row, SemanticType};
484    use datafusion::prelude::SessionContext;
485    use datatypes::schema::{ColumnSchema, Schema};
486    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
487    use table::table::adapter::DfTableProviderAdapter;
488    use table::test_util::MemTable;
489
490    use super::*;
491
492    const EQ_TYPE: i32 = MatcherType::Eq as i32;
493    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
494    const RE_TYPE: i32 = MatcherType::Re as i32;
495
496    #[test]
497    fn test_table_name() {
498        let q = Query {
499            start_timestamp_ms: 1000,
500            end_timestamp_ms: 2000,
501            matchers: vec![],
502            ..Default::default()
503        };
504        let err = table_name(&q).unwrap_err();
505        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
506
507        let q = Query {
508            start_timestamp_ms: 1000,
509            end_timestamp_ms: 2000,
510            matchers: vec![LabelMatcher {
511                name: METRIC_NAME_LABEL.to_string(),
512                value: "test".to_string(),
513                r#type: EQ_TYPE,
514            }],
515            ..Default::default()
516        };
517        assert_eq!("test", table_name(&q).unwrap());
518    }
519
520    #[test]
521    fn test_query_to_plan() {
522        let q = Query {
523            start_timestamp_ms: 1000,
524            end_timestamp_ms: 2000,
525            matchers: vec![LabelMatcher {
526                name: METRIC_NAME_LABEL.to_string(),
527                value: "test".to_string(),
528                r#type: EQ_TYPE,
529            }],
530            ..Default::default()
531        };
532
533        let schema = Arc::new(Schema::new(vec![
534            ColumnSchema::new(
535                GREPTIME_TIMESTAMP,
536                ConcreteDataType::timestamp_millisecond_datatype(),
537                true,
538            ),
539            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
540            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
541            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
542        ]));
543        let recordbatch = RecordBatch::new(
544            schema,
545            vec![
546                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
547                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
548                Arc::new(StringVector::from(vec!["host1"])) as _,
549                Arc::new(StringVector::from(vec!["job"])) as _,
550            ],
551        )
552        .unwrap();
553
554        let ctx = SessionContext::new();
555        let table = MemTable::table("test", recordbatch);
556        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
557
558        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
559        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
560        let display_string = format!("{}", plan.display_indent());
561
562        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n  TableScan: ?table?", display_string);
563
564        let q = Query {
565            start_timestamp_ms: 1000,
566            end_timestamp_ms: 2000,
567            matchers: vec![
568                LabelMatcher {
569                    name: METRIC_NAME_LABEL.to_string(),
570                    value: "test".to_string(),
571                    r#type: EQ_TYPE,
572                },
573                LabelMatcher {
574                    name: "job".to_string(),
575                    value: "*prom*".to_string(),
576                    r#type: RE_TYPE,
577                },
578                LabelMatcher {
579                    name: "instance".to_string(),
580                    value: "localhost".to_string(),
581                    r#type: NEQ_TYPE,
582                },
583            ],
584            ..Default::default()
585        };
586
587        let dataframe = ctx.read_table(table_provider).unwrap();
588        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
589        let display_string = format!("{}", plan.display_indent());
590
591        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?", display_string);
592    }
593
594    fn column_schemas_with(
595        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
596    ) -> Vec<api::v1::ColumnSchema> {
597        kts_iter.push((
598            "greptime_value",
599            ColumnDataType::Float64,
600            SemanticType::Field,
601        ));
602        kts_iter.push((
603            "greptime_timestamp",
604            ColumnDataType::TimestampMillisecond,
605            SemanticType::Timestamp,
606        ));
607
608        kts_iter
609            .into_iter()
610            .map(|(k, t, s)| api::v1::ColumnSchema {
611                column_name: k.to_string(),
612                datatype: t as i32,
613                semantic_type: s as i32,
614                ..Default::default()
615            })
616            .collect()
617    }
618
619    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
620        Row {
621            values: vec![
622                api::v1::Value {
623                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
624                },
625                api::v1::Value {
626                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
627                },
628                api::v1::Value {
629                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
630                        timestamp,
631                    )),
632                },
633            ],
634        }
635    }
636
637    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
638        Row {
639            values: vec![
640                api::v1::Value {
641                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
642                },
643                api::v1::Value {
644                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
645                },
646                api::v1::Value {
647                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
648                },
649                api::v1::Value {
650                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
651                        timestamp,
652                    )),
653                },
654            ],
655        }
656    }
657
658    #[test]
659    fn test_write_request_to_row_insert_exprs() {
660        let write_request = WriteRequest {
661            timeseries: mock_timeseries(),
662            ..Default::default()
663        };
664
665        let mut exprs = to_grpc_row_insert_requests(&write_request)
666            .unwrap()
667            .0
668            .inserts;
669        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
670        assert_eq!(3, exprs.len());
671        assert_eq!("metric1", exprs[0].table_name);
672        assert_eq!("metric2", exprs[1].table_name);
673        assert_eq!("metric3", exprs[2].table_name);
674
675        let rows = exprs[0].rows.as_ref().unwrap();
676        let schema = &rows.schema;
677        let rows = &rows.rows;
678        assert_eq!(2, rows.len());
679        assert_eq!(3, schema.len());
680        assert_eq!(
681            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
682            *schema
683        );
684        assert_eq!(
685            &vec![
686                make_row_with_label("spark", 1.0, 1000),
687                make_row_with_label("spark", 2.0, 2000),
688            ],
689            rows
690        );
691
692        let rows = exprs[1].rows.as_ref().unwrap();
693        let schema = &rows.schema;
694        let rows = &rows.rows;
695        assert_eq!(2, rows.len());
696        assert_eq!(4, schema.len());
697        assert_eq!(
698            column_schemas_with(vec![
699                ("instance", ColumnDataType::String, SemanticType::Tag),
700                ("idc", ColumnDataType::String, SemanticType::Tag)
701            ]),
702            *schema
703        );
704        assert_eq!(
705            &vec![
706                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
707                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
708            ],
709            rows
710        );
711
712        let rows = exprs[2].rows.as_ref().unwrap();
713        let schema = &rows.schema;
714        let rows = &rows.rows;
715        assert_eq!(3, rows.len());
716        assert_eq!(4, schema.len());
717        assert_eq!(
718            column_schemas_with(vec![
719                ("idc", ColumnDataType::String, SemanticType::Tag),
720                ("app", ColumnDataType::String, SemanticType::Tag)
721            ]),
722            *schema
723        );
724        assert_eq!(
725            &vec![
726                make_row_with_2_labels("z002", "biz", 5.0, 1000),
727                make_row_with_2_labels("z002", "biz", 6.0, 2000),
728                make_row_with_2_labels("z002", "biz", 7.0, 3000),
729            ],
730            rows
731        );
732    }
733
734    #[test]
735    fn test_recordbatches_to_timeseries() {
736        let schema = Arc::new(Schema::new(vec![
737            ColumnSchema::new(
738                GREPTIME_TIMESTAMP,
739                ConcreteDataType::timestamp_millisecond_datatype(),
740                true,
741            ),
742            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
743            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
744        ]));
745
746        let recordbatches = RecordBatches::try_new(
747            schema.clone(),
748            vec![
749                RecordBatch::new(
750                    schema.clone(),
751                    vec![
752                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
753                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
754                        Arc::new(StringVector::from(vec!["host1"])) as _,
755                    ],
756                )
757                .unwrap(),
758                RecordBatch::new(
759                    schema,
760                    vec![
761                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
762                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
763                        Arc::new(StringVector::from(vec!["host2"])) as _,
764                    ],
765                )
766                .unwrap(),
767            ],
768        )
769        .unwrap();
770
771        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
772        assert_eq!(2, timeseries.len());
773
774        assert_eq!(
775            vec![
776                Label {
777                    name: METRIC_NAME_LABEL.to_string(),
778                    value: "metric1".to_string(),
779                },
780                Label {
781                    name: "instance".to_string(),
782                    value: "host1".to_string(),
783                },
784            ],
785            timeseries[0].labels
786        );
787
788        assert_eq!(
789            timeseries[0].samples,
790            vec![Sample {
791                value: 3.0,
792                timestamp: 1000,
793            }]
794        );
795
796        assert_eq!(
797            vec![
798                Label {
799                    name: METRIC_NAME_LABEL.to_string(),
800                    value: "metric1".to_string(),
801                },
802                Label {
803                    name: "instance".to_string(),
804                    value: "host2".to_string(),
805                },
806            ],
807            timeseries[1].labels
808        );
809        assert_eq!(
810            timeseries[1].samples,
811            vec![Sample {
812                value: 7.0,
813                timestamp: 2000,
814            }]
815        );
816    }
817}