servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
54pub const FIELD_NAME_LABEL: &str = "__field__";
55
56/// Metrics for push gateway protocol
57pub struct Metrics {
58    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61/// Get table name from remote query
62pub fn table_name(q: &Query) -> Result<String> {
63    let label_matches = &q.matchers;
64
65    label_matches
66        .iter()
67        .find_map(|m| {
68            if m.name == METRIC_NAME_LABEL {
69                Some(m.value.to_string())
70            } else {
71                None
72            }
73        })
74        .context(error::InvalidPromRemoteRequestSnafu {
75            msg: "missing '__name__' label in timeseries",
76        })
77}
78
79/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
80/// Prioritizes __schema__ over __database__ labels.
81pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82    for query in &request.queries {
83        for matcher in &query.matchers {
84            if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85                return Some(matcher.value.clone());
86            }
87        }
88    }
89
90    // If no __schema__ found, look for __database__
91    for query in &request.queries {
92        for matcher in &query.matchers {
93            if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94                return Some(matcher.value.clone());
95            }
96        }
97    }
98
99    None
100}
101
102/// Create a DataFrame from a remote Query
103#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105    let DataFrame::DataFusion(dataframe) = dataframe;
106
107    let start_timestamp_ms = q.start_timestamp_ms;
108    let end_timestamp_ms = q.end_timestamp_ms;
109
110    let label_matches = &q.matchers;
111
112    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
113
114    conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
115    conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
116
117    for m in label_matches {
118        let name = &m.name;
119
120        if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
121            continue;
122        }
123
124        let value = &m.value;
125        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
126            error::InvalidPromRemoteRequestSnafu {
127                msg: format!("invalid LabelMatcher type, decode error: {e}",),
128            }
129            .build()
130        })?;
131
132        match m_type {
133            MatcherType::Eq => {
134                conditions.push(col(name).eq(lit(value)));
135            }
136            MatcherType::Neq => {
137                conditions.push(col(name).not_eq(lit(value)));
138            }
139            // Case sensitive regexp match
140            MatcherType::Re => {
141                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
142            }
143            // Case sensitive regexp not match
144            MatcherType::Nre => {
145                conditions.push(regexp_match(col(name), lit(value), None).is_null());
146            }
147        }
148    }
149
150    // Safety: conditions MUST not be empty, reduce always return Some(expr).
151    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
152
153    let dataframe = dataframe
154        .filter(conditions)
155        .context(error::DataFrameSnafu)?;
156
157    Ok(dataframe.into_parts().1)
158}
159
160#[inline]
161fn new_label(name: String, value: String) -> Label {
162    Label { name, value }
163}
164
165fn lit_timestamp_millisecond(ts: i64) -> Expr {
166    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
167}
168
169// A timeseries id
170#[derive(Debug)]
171struct TimeSeriesId {
172    labels: Vec<Label>,
173}
174
175/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
176impl PartialEq for TimeSeriesId {
177    fn eq(&self, other: &Self) -> bool {
178        if self.labels.len() != other.labels.len() {
179            return false;
180        }
181
182        self.labels
183            .iter()
184            .zip(other.labels.iter())
185            .all(|(l, r)| l.name == r.name && l.value == r.value)
186    }
187}
188impl Eq for TimeSeriesId {}
189
190impl Hash for TimeSeriesId {
191    fn hash<H: Hasher>(&self, state: &mut H) {
192        for label in &self.labels {
193            label.name.hash(state);
194            label.value.hash(state);
195        }
196    }
197}
198
199/// For Sorting timeseries
200impl Ord for TimeSeriesId {
201    fn cmp(&self, other: &Self) -> Ordering {
202        let ordering = self.labels.len().cmp(&other.labels.len());
203        if ordering != Ordering::Equal {
204            return ordering;
205        }
206
207        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
208            let ordering = l.name.cmp(&r.name);
209
210            if ordering != Ordering::Equal {
211                return ordering;
212            }
213
214            let ordering = l.value.cmp(&r.value);
215
216            if ordering != Ordering::Equal {
217                return ordering;
218            }
219        }
220        Ordering::Equal
221    }
222}
223
224impl PartialOrd for TimeSeriesId {
225    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
226        Some(self.cmp(other))
227    }
228}
229
230/// Collect each row's timeseries id
231/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
232fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
233    let row_count = recordbatch.num_rows();
234    let mut timeseries_ids = Vec::with_capacity(row_count);
235
236    for row in 0..row_count {
237        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
238        labels.push(new_label(
239            METRIC_NAME_LABEL.to_string(),
240            table_name.to_string(),
241        ));
242
243        for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
244            if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
245                continue;
246            }
247
248            let column = &recordbatch.columns()[i];
249            // A label with an empty label value is considered equivalent to a label that does not exist.
250            if column.is_null(row) {
251                continue;
252            }
253
254            let value = column.get(row).to_string();
255            labels.push(new_label(column_schema.name.clone(), value));
256        }
257        timeseries_ids.push(TimeSeriesId { labels });
258    }
259    timeseries_ids
260}
261
262pub fn recordbatches_to_timeseries(
263    table_name: &str,
264    recordbatches: RecordBatches,
265) -> Result<Vec<TimeSeries>> {
266    Ok(recordbatches
267        .take()
268        .into_iter()
269        .map(|x| recordbatch_to_timeseries(table_name, x))
270        .collect::<Result<Vec<_>>>()?
271        .into_iter()
272        .flatten()
273        .collect())
274}
275
276fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
277    let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
278        error::InvalidPromRemoteReadQueryResultSnafu {
279            msg: "missing greptime_timestamp column in query result",
280        },
281    )?;
282    ensure!(
283        ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
284        error::InvalidPromRemoteReadQueryResultSnafu {
285            msg: format!(
286                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
287                ts_column.data_type()
288            )
289        }
290    );
291
292    let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
293        error::InvalidPromRemoteReadQueryResultSnafu {
294            msg: "missing greptime_value column in query result",
295        },
296    )?;
297    ensure!(
298        field_column.data_type() == ConcreteDataType::float64_datatype(),
299        error::InvalidPromRemoteReadQueryResultSnafu {
300            msg: format!(
301                "Expect value column of datatype Float64, actual {:?}",
302                field_column.data_type()
303            )
304        }
305    );
306
307    // First, collect each row's timeseries id
308    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
309    // Then, group timeseries by it's id.
310    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
311
312    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
313        let timeseries = timeseries_map
314            .entry(timeseries_id)
315            .or_insert_with(|| TimeSeries {
316                labels: timeseries_id.labels.clone(),
317                ..Default::default()
318            });
319
320        if ts_column.is_null(row) || field_column.is_null(row) {
321            continue;
322        }
323
324        let value: f64 = match field_column.get(row) {
325            Value::Float64(value) => value.into(),
326            _ => unreachable!("checked by the \"ensure\" above"),
327        };
328        let timestamp = match ts_column.get(row) {
329            Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
330            _ => unreachable!("checked by the \"ensure\" above"),
331        };
332        let sample = Sample { value, timestamp };
333
334        timeseries.samples.push(sample);
335    }
336
337    Ok(timeseries_map.into_values().collect())
338}
339
340pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
341    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
342
343    let mut multi_table_data = MultiTableData::new();
344
345    for series in &request.timeseries {
346        let table_name = &series
347            .labels
348            .iter()
349            .find(|label| {
350                // The metric name is a special label
351                label.name == METRIC_NAME_LABEL
352            })
353            .context(error::InvalidPromRemoteRequestSnafu {
354                msg: "missing '__name__' label in time-series",
355            })?
356            .value;
357
358        // The metric name is a special label,
359        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
360        let num_columns = series.labels.len() + 1;
361
362        let table_data = multi_table_data.get_or_default_table_data(
363            table_name,
364            num_columns,
365            series.samples.len(),
366        );
367
368        // labels
369        let kvs = series.labels.iter().filter_map(|label| {
370            if label.name == METRIC_NAME_LABEL {
371                None
372            } else {
373                Some((label.name.clone(), label.value.clone()))
374            }
375        });
376
377        if series.samples.len() == 1 {
378            let mut one_row = table_data.alloc_one_row();
379
380            row_writer::write_tags(table_data, kvs, &mut one_row)?;
381            // value
382            row_writer::write_f64(
383                table_data,
384                GREPTIME_VALUE,
385                series.samples[0].value,
386                &mut one_row,
387            )?;
388            // timestamp
389            row_writer::write_ts_to_millis(
390                table_data,
391                GREPTIME_TIMESTAMP,
392                Some(series.samples[0].timestamp),
393                Precision::Millisecond,
394                &mut one_row,
395            )?;
396
397            table_data.add_row(one_row);
398        } else {
399            for Sample { value, timestamp } in &series.samples {
400                let mut one_row = table_data.alloc_one_row();
401
402                // labels
403                let kvs = kvs.clone();
404                row_writer::write_tags(table_data, kvs, &mut one_row)?;
405                // value
406                row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
407                // timestamp
408                row_writer::write_ts_to_millis(
409                    table_data,
410                    GREPTIME_TIMESTAMP,
411                    Some(*timestamp),
412                    Precision::Millisecond,
413                    &mut one_row,
414                )?;
415
416                table_data.add_row(one_row);
417            }
418        }
419    }
420
421    Ok(multi_table_data.into_row_insert_requests())
422}
423
424#[inline]
425pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
426    let mut decoder = Decoder::new();
427    decoder
428        .decompress_vec(buf)
429        .context(error::DecompressSnappyPromRemoteRequestSnafu)
430}
431
432#[inline]
433pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
434    let mut encoder = Encoder::new();
435    encoder
436        .compress_vec(buf)
437        .context(error::CompressPromRemoteRequestSnafu)
438}
439
440#[inline]
441pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
442    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
443}
444
445/// Mock timeseries for test, it is both used in servers and frontend crate
446/// So we present it here
447pub fn mock_timeseries() -> Vec<TimeSeries> {
448    vec![
449        TimeSeries {
450            labels: vec![
451                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
452                new_label("job".to_string(), "spark".to_string()),
453            ],
454            samples: vec![
455                Sample {
456                    value: 1.0f64,
457                    timestamp: 1000,
458                },
459                Sample {
460                    value: 2.0f64,
461                    timestamp: 2000,
462                },
463            ],
464            ..Default::default()
465        },
466        TimeSeries {
467            labels: vec![
468                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
469                new_label("instance".to_string(), "test_host1".to_string()),
470                new_label("idc".to_string(), "z001".to_string()),
471            ],
472            samples: vec![
473                Sample {
474                    value: 3.0f64,
475                    timestamp: 1000,
476                },
477                Sample {
478                    value: 4.0f64,
479                    timestamp: 2000,
480                },
481            ],
482            ..Default::default()
483        },
484        TimeSeries {
485            labels: vec![
486                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
487                new_label("idc".to_string(), "z002".to_string()),
488                new_label("app".to_string(), "biz".to_string()),
489            ],
490            samples: vec![
491                Sample {
492                    value: 5.0f64,
493                    timestamp: 1000,
494                },
495                Sample {
496                    value: 6.0f64,
497                    timestamp: 2000,
498                },
499                Sample {
500                    value: 7.0f64,
501                    timestamp: 3000,
502                },
503            ],
504            ..Default::default()
505        },
506    ]
507}
508
509/// Add new labels to the mock timeseries.
510pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
511    let ts_demo_metrics = TimeSeries {
512        labels: vec![
513            new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
514            new_label("idc".to_string(), "idc3".to_string()),
515            new_label("new_label1".to_string(), "foo".to_string()),
516        ],
517        samples: vec![Sample {
518            value: 42.0,
519            timestamp: 3000,
520        }],
521        ..Default::default()
522    };
523    let ts_multi_labels = TimeSeries {
524        labels: vec![
525            new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
526            new_label("idc".to_string(), "idc4".to_string()),
527            new_label("env".to_string(), "prod".to_string()),
528            new_label("host".to_string(), "host9".to_string()),
529            new_label("new_label2".to_string(), "bar".to_string()),
530        ],
531        samples: vec![Sample {
532            value: 99.0,
533            timestamp: 4000,
534        }],
535        ..Default::default()
536    };
537
538    vec![ts_demo_metrics, ts_multi_labels]
539}
540
541/// Add new labels to the mock timeseries.
542pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
543    let idc3_schema = TimeSeries {
544        labels: vec![
545            new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
546            new_label("__database__".to_string(), "idc3".to_string()),
547            new_label("__physical_table__".to_string(), "f1".to_string()),
548        ],
549        samples: vec![Sample {
550            value: 42.0,
551            timestamp: 3000,
552        }],
553        ..Default::default()
554    };
555    let idc4_schema = TimeSeries {
556        labels: vec![
557            new_label(
558                METRIC_NAME_LABEL.to_string(),
559                "idc4_local_table".to_string(),
560            ),
561            new_label("__database__".to_string(), "idc4".to_string()),
562            new_label("__physical_table__".to_string(), "f2".to_string()),
563        ],
564        samples: vec![Sample {
565            value: 99.0,
566            timestamp: 4000,
567        }],
568        ..Default::default()
569    };
570
571    vec![idc3_schema, idc4_schema]
572}
573
574#[cfg(test)]
575mod tests {
576    use std::sync::Arc;
577
578    use api::prom_store::remote::LabelMatcher;
579    use api::v1::{ColumnDataType, Row, SemanticType};
580    use datafusion::prelude::SessionContext;
581    use datatypes::schema::{ColumnSchema, Schema};
582    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
583    use table::table::adapter::DfTableProviderAdapter;
584    use table::test_util::MemTable;
585
586    use super::*;
587
588    const EQ_TYPE: i32 = MatcherType::Eq as i32;
589    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
590    const RE_TYPE: i32 = MatcherType::Re as i32;
591
592    #[test]
593    fn test_table_name() {
594        let q = Query {
595            start_timestamp_ms: 1000,
596            end_timestamp_ms: 2000,
597            matchers: vec![],
598            ..Default::default()
599        };
600        let err = table_name(&q).unwrap_err();
601        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
602
603        let q = Query {
604            start_timestamp_ms: 1000,
605            end_timestamp_ms: 2000,
606            matchers: vec![LabelMatcher {
607                name: METRIC_NAME_LABEL.to_string(),
608                value: "test".to_string(),
609                r#type: EQ_TYPE,
610            }],
611            ..Default::default()
612        };
613        assert_eq!("test", table_name(&q).unwrap());
614    }
615
616    #[test]
617    fn test_query_to_plan() {
618        let q = Query {
619            start_timestamp_ms: 1000,
620            end_timestamp_ms: 2000,
621            matchers: vec![LabelMatcher {
622                name: METRIC_NAME_LABEL.to_string(),
623                value: "test".to_string(),
624                r#type: EQ_TYPE,
625            }],
626            ..Default::default()
627        };
628
629        let schema = Arc::new(Schema::new(vec![
630            ColumnSchema::new(
631                GREPTIME_TIMESTAMP,
632                ConcreteDataType::timestamp_millisecond_datatype(),
633                true,
634            ),
635            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
636            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
637            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
638        ]));
639        let recordbatch = RecordBatch::new(
640            schema,
641            vec![
642                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
643                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
644                Arc::new(StringVector::from(vec!["host1"])) as _,
645                Arc::new(StringVector::from(vec!["job"])) as _,
646            ],
647        )
648        .unwrap();
649
650        let ctx = SessionContext::new();
651        let table = MemTable::table("test", recordbatch);
652        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
653
654        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
655        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
656        let display_string = format!("{}", plan.display_indent());
657
658        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n  TableScan: ?table?", display_string);
659
660        let q = Query {
661            start_timestamp_ms: 1000,
662            end_timestamp_ms: 2000,
663            matchers: vec![
664                LabelMatcher {
665                    name: METRIC_NAME_LABEL.to_string(),
666                    value: "test".to_string(),
667                    r#type: EQ_TYPE,
668                },
669                LabelMatcher {
670                    name: "job".to_string(),
671                    value: "*prom*".to_string(),
672                    r#type: RE_TYPE,
673                },
674                LabelMatcher {
675                    name: "instance".to_string(),
676                    value: "localhost".to_string(),
677                    r#type: NEQ_TYPE,
678                },
679            ],
680            ..Default::default()
681        };
682
683        let dataframe = ctx.read_table(table_provider).unwrap();
684        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
685        let display_string = format!("{}", plan.display_indent());
686
687        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?", display_string);
688    }
689
690    fn column_schemas_with(
691        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
692    ) -> Vec<api::v1::ColumnSchema> {
693        kts_iter.push((
694            "greptime_value",
695            ColumnDataType::Float64,
696            SemanticType::Field,
697        ));
698        kts_iter.push((
699            "greptime_timestamp",
700            ColumnDataType::TimestampMillisecond,
701            SemanticType::Timestamp,
702        ));
703
704        kts_iter
705            .into_iter()
706            .map(|(k, t, s)| api::v1::ColumnSchema {
707                column_name: k.to_string(),
708                datatype: t as i32,
709                semantic_type: s as i32,
710                ..Default::default()
711            })
712            .collect()
713    }
714
715    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
716        Row {
717            values: vec![
718                api::v1::Value {
719                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
720                },
721                api::v1::Value {
722                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
723                },
724                api::v1::Value {
725                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
726                        timestamp,
727                    )),
728                },
729            ],
730        }
731    }
732
733    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
734        Row {
735            values: vec![
736                api::v1::Value {
737                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
738                },
739                api::v1::Value {
740                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
741                },
742                api::v1::Value {
743                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
744                },
745                api::v1::Value {
746                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
747                        timestamp,
748                    )),
749                },
750            ],
751        }
752    }
753
754    #[test]
755    fn test_write_request_to_row_insert_exprs() {
756        let write_request = WriteRequest {
757            timeseries: mock_timeseries(),
758            ..Default::default()
759        };
760
761        let mut exprs = to_grpc_row_insert_requests(&write_request)
762            .unwrap()
763            .0
764            .inserts;
765        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
766        assert_eq!(3, exprs.len());
767        assert_eq!("metric1", exprs[0].table_name);
768        assert_eq!("metric2", exprs[1].table_name);
769        assert_eq!("metric3", exprs[2].table_name);
770
771        let rows = exprs[0].rows.as_ref().unwrap();
772        let schema = &rows.schema;
773        let rows = &rows.rows;
774        assert_eq!(2, rows.len());
775        assert_eq!(3, schema.len());
776        assert_eq!(
777            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
778            *schema
779        );
780        assert_eq!(
781            &vec![
782                make_row_with_label("spark", 1.0, 1000),
783                make_row_with_label("spark", 2.0, 2000),
784            ],
785            rows
786        );
787
788        let rows = exprs[1].rows.as_ref().unwrap();
789        let schema = &rows.schema;
790        let rows = &rows.rows;
791        assert_eq!(2, rows.len());
792        assert_eq!(4, schema.len());
793        assert_eq!(
794            column_schemas_with(vec![
795                ("instance", ColumnDataType::String, SemanticType::Tag),
796                ("idc", ColumnDataType::String, SemanticType::Tag)
797            ]),
798            *schema
799        );
800        assert_eq!(
801            &vec![
802                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
803                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
804            ],
805            rows
806        );
807
808        let rows = exprs[2].rows.as_ref().unwrap();
809        let schema = &rows.schema;
810        let rows = &rows.rows;
811        assert_eq!(3, rows.len());
812        assert_eq!(4, schema.len());
813        assert_eq!(
814            column_schemas_with(vec![
815                ("idc", ColumnDataType::String, SemanticType::Tag),
816                ("app", ColumnDataType::String, SemanticType::Tag)
817            ]),
818            *schema
819        );
820        assert_eq!(
821            &vec![
822                make_row_with_2_labels("z002", "biz", 5.0, 1000),
823                make_row_with_2_labels("z002", "biz", 6.0, 2000),
824                make_row_with_2_labels("z002", "biz", 7.0, 3000),
825            ],
826            rows
827        );
828    }
829
830    #[test]
831    fn test_recordbatches_to_timeseries() {
832        let schema = Arc::new(Schema::new(vec![
833            ColumnSchema::new(
834                GREPTIME_TIMESTAMP,
835                ConcreteDataType::timestamp_millisecond_datatype(),
836                true,
837            ),
838            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
839            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
840        ]));
841
842        let recordbatches = RecordBatches::try_new(
843            schema.clone(),
844            vec![
845                RecordBatch::new(
846                    schema.clone(),
847                    vec![
848                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
849                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
850                        Arc::new(StringVector::from(vec!["host1"])) as _,
851                    ],
852                )
853                .unwrap(),
854                RecordBatch::new(
855                    schema,
856                    vec![
857                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
858                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
859                        Arc::new(StringVector::from(vec!["host2"])) as _,
860                    ],
861                )
862                .unwrap(),
863            ],
864        )
865        .unwrap();
866
867        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
868        assert_eq!(2, timeseries.len());
869
870        assert_eq!(
871            vec![
872                Label {
873                    name: METRIC_NAME_LABEL.to_string(),
874                    value: "metric1".to_string(),
875                },
876                Label {
877                    name: "instance".to_string(),
878                    value: "host1".to_string(),
879                },
880            ],
881            timeseries[0].labels
882        );
883
884        assert_eq!(
885            timeseries[0].samples,
886            vec![Sample {
887                value: 3.0,
888                timestamp: 1000,
889            }]
890        );
891
892        assert_eq!(
893            vec![
894                Label {
895                    name: METRIC_NAME_LABEL.to_string(),
896                    value: "metric1".to_string(),
897                },
898                Label {
899                    name: "instance".to_string(),
900                    value: "host2".to_string(),
901                },
902            ],
903            timeseries[1].labels
904        );
905        assert_eq!(
906            timeseries[1].samples,
907            vec![Sample {
908                value: 7.0,
909                timestamp: 2000,
910            }]
911        );
912    }
913}