servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
48pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
49
50/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
51pub const FIELD_NAME_LABEL: &str = "__field__";
52
53/// Metrics for push gateway protocol
54pub struct Metrics {
55    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
56}
57
58/// Get table name from remote query
59pub fn table_name(q: &Query) -> Result<String> {
60    let label_matches = &q.matchers;
61
62    label_matches
63        .iter()
64        .find_map(|m| {
65            if m.name == METRIC_NAME_LABEL {
66                Some(m.value.to_string())
67            } else {
68                None
69            }
70        })
71        .context(error::InvalidPromRemoteRequestSnafu {
72            msg: "missing '__name__' label in timeseries",
73        })
74}
75
76/// Create a DataFrame from a remote Query
77#[tracing::instrument(skip_all)]
78pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
79    let DataFrame::DataFusion(dataframe) = dataframe;
80
81    let start_timestamp_ms = q.start_timestamp_ms;
82    let end_timestamp_ms = q.end_timestamp_ms;
83
84    let label_matches = &q.matchers;
85
86    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
87
88    conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
89    conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
90
91    for m in label_matches {
92        let name = &m.name;
93
94        if name == METRIC_NAME_LABEL {
95            continue;
96        }
97
98        let value = &m.value;
99        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
100            error::InvalidPromRemoteRequestSnafu {
101                msg: format!("invalid LabelMatcher type, decode error: {e}",),
102            }
103            .build()
104        })?;
105
106        match m_type {
107            MatcherType::Eq => {
108                conditions.push(col(name).eq(lit(value)));
109            }
110            MatcherType::Neq => {
111                conditions.push(col(name).not_eq(lit(value)));
112            }
113            // Case sensitive regexp match
114            MatcherType::Re => {
115                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
116            }
117            // Case sensitive regexp not match
118            MatcherType::Nre => {
119                conditions.push(regexp_match(col(name), lit(value), None).is_null());
120            }
121        }
122    }
123
124    // Safety: conditions MUST not be empty, reduce always return Some(expr).
125    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
126
127    let dataframe = dataframe
128        .filter(conditions)
129        .context(error::DataFrameSnafu)?;
130
131    Ok(dataframe.into_parts().1)
132}
133
134#[inline]
135fn new_label(name: String, value: String) -> Label {
136    Label { name, value }
137}
138
139fn lit_timestamp_millisecond(ts: i64) -> Expr {
140    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None))
141}
142
143// A timeseries id
144#[derive(Debug)]
145struct TimeSeriesId {
146    labels: Vec<Label>,
147}
148
149/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
150impl PartialEq for TimeSeriesId {
151    fn eq(&self, other: &Self) -> bool {
152        if self.labels.len() != other.labels.len() {
153            return false;
154        }
155
156        self.labels
157            .iter()
158            .zip(other.labels.iter())
159            .all(|(l, r)| l.name == r.name && l.value == r.value)
160    }
161}
162impl Eq for TimeSeriesId {}
163
164impl Hash for TimeSeriesId {
165    fn hash<H: Hasher>(&self, state: &mut H) {
166        for label in &self.labels {
167            label.name.hash(state);
168            label.value.hash(state);
169        }
170    }
171}
172
173/// For Sorting timeseries
174impl Ord for TimeSeriesId {
175    fn cmp(&self, other: &Self) -> Ordering {
176        let ordering = self.labels.len().cmp(&other.labels.len());
177        if ordering != Ordering::Equal {
178            return ordering;
179        }
180
181        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
182            let ordering = l.name.cmp(&r.name);
183
184            if ordering != Ordering::Equal {
185                return ordering;
186            }
187
188            let ordering = l.value.cmp(&r.value);
189
190            if ordering != Ordering::Equal {
191                return ordering;
192            }
193        }
194        Ordering::Equal
195    }
196}
197
198impl PartialOrd for TimeSeriesId {
199    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
200        Some(self.cmp(other))
201    }
202}
203
204/// Collect each row's timeseries id
205/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
206fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
207    let row_count = recordbatch.num_rows();
208    let mut timeseries_ids = Vec::with_capacity(row_count);
209
210    for row in 0..row_count {
211        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
212        labels.push(new_label(
213            METRIC_NAME_LABEL.to_string(),
214            table_name.to_string(),
215        ));
216
217        for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
218            if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
219                continue;
220            }
221
222            let column = &recordbatch.columns()[i];
223            // A label with an empty label value is considered equivalent to a label that does not exist.
224            if column.is_null(row) {
225                continue;
226            }
227
228            let value = column.get(row).to_string();
229            labels.push(new_label(column_schema.name.clone(), value));
230        }
231        timeseries_ids.push(TimeSeriesId { labels });
232    }
233    timeseries_ids
234}
235
236pub fn recordbatches_to_timeseries(
237    table_name: &str,
238    recordbatches: RecordBatches,
239) -> Result<Vec<TimeSeries>> {
240    Ok(recordbatches
241        .take()
242        .into_iter()
243        .map(|x| recordbatch_to_timeseries(table_name, x))
244        .collect::<Result<Vec<_>>>()?
245        .into_iter()
246        .flatten()
247        .collect())
248}
249
250fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
251    let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
252        error::InvalidPromRemoteReadQueryResultSnafu {
253            msg: "missing greptime_timestamp column in query result",
254        },
255    )?;
256    ensure!(
257        ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
258        error::InvalidPromRemoteReadQueryResultSnafu {
259            msg: format!(
260                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
261                ts_column.data_type()
262            )
263        }
264    );
265
266    let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
267        error::InvalidPromRemoteReadQueryResultSnafu {
268            msg: "missing greptime_value column in query result",
269        },
270    )?;
271    ensure!(
272        field_column.data_type() == ConcreteDataType::float64_datatype(),
273        error::InvalidPromRemoteReadQueryResultSnafu {
274            msg: format!(
275                "Expect value column of datatype Float64, actual {:?}",
276                field_column.data_type()
277            )
278        }
279    );
280
281    // First, collect each row's timeseries id
282    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
283    // Then, group timeseries by it's id.
284    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
285
286    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
287        let timeseries = timeseries_map
288            .entry(timeseries_id)
289            .or_insert_with(|| TimeSeries {
290                labels: timeseries_id.labels.clone(),
291                ..Default::default()
292            });
293
294        if ts_column.is_null(row) || field_column.is_null(row) {
295            continue;
296        }
297
298        let value: f64 = match field_column.get(row) {
299            Value::Float64(value) => value.into(),
300            _ => unreachable!("checked by the \"ensure\" above"),
301        };
302        let timestamp = match ts_column.get(row) {
303            Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
304            _ => unreachable!("checked by the \"ensure\" above"),
305        };
306        let sample = Sample { value, timestamp };
307
308        timeseries.samples.push(sample);
309    }
310
311    Ok(timeseries_map.into_values().collect())
312}
313
314pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
315    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
316
317    let mut multi_table_data = MultiTableData::new();
318
319    for series in &request.timeseries {
320        let table_name = &series
321            .labels
322            .iter()
323            .find(|label| {
324                // The metric name is a special label
325                label.name == METRIC_NAME_LABEL
326            })
327            .context(error::InvalidPromRemoteRequestSnafu {
328                msg: "missing '__name__' label in time-series",
329            })?
330            .value;
331
332        // The metric name is a special label,
333        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
334        let num_columns = series.labels.len() + 1;
335
336        let table_data = multi_table_data.get_or_default_table_data(
337            table_name,
338            num_columns,
339            series.samples.len(),
340        );
341
342        // labels
343        let kvs = series.labels.iter().filter_map(|label| {
344            if label.name == METRIC_NAME_LABEL {
345                None
346            } else {
347                Some((label.name.clone(), label.value.clone()))
348            }
349        });
350
351        if series.samples.len() == 1 {
352            let mut one_row = table_data.alloc_one_row();
353
354            row_writer::write_tags(table_data, kvs, &mut one_row)?;
355            // value
356            row_writer::write_f64(
357                table_data,
358                GREPTIME_VALUE,
359                series.samples[0].value,
360                &mut one_row,
361            )?;
362            // timestamp
363            row_writer::write_ts_to_millis(
364                table_data,
365                GREPTIME_TIMESTAMP,
366                Some(series.samples[0].timestamp),
367                Precision::Millisecond,
368                &mut one_row,
369            )?;
370
371            table_data.add_row(one_row);
372        } else {
373            for Sample { value, timestamp } in &series.samples {
374                let mut one_row = table_data.alloc_one_row();
375
376                // labels
377                let kvs = kvs.clone();
378                row_writer::write_tags(table_data, kvs, &mut one_row)?;
379                // value
380                row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
381                // timestamp
382                row_writer::write_ts_to_millis(
383                    table_data,
384                    GREPTIME_TIMESTAMP,
385                    Some(*timestamp),
386                    Precision::Millisecond,
387                    &mut one_row,
388                )?;
389
390                table_data.add_row(one_row);
391            }
392        }
393    }
394
395    Ok(multi_table_data.into_row_insert_requests())
396}
397
398#[inline]
399pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
400    let mut decoder = Decoder::new();
401    decoder
402        .decompress_vec(buf)
403        .context(error::DecompressSnappyPromRemoteRequestSnafu)
404}
405
406#[inline]
407pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
408    let mut encoder = Encoder::new();
409    encoder
410        .compress_vec(buf)
411        .context(error::CompressPromRemoteRequestSnafu)
412}
413
414#[inline]
415pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
416    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
417}
418
419/// Mock timeseries for test, it is both used in servers and frontend crate
420/// So we present it here
421pub fn mock_timeseries() -> Vec<TimeSeries> {
422    vec![
423        TimeSeries {
424            labels: vec![
425                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
426                new_label("job".to_string(), "spark".to_string()),
427            ],
428            samples: vec![
429                Sample {
430                    value: 1.0f64,
431                    timestamp: 1000,
432                },
433                Sample {
434                    value: 2.0f64,
435                    timestamp: 2000,
436                },
437            ],
438            ..Default::default()
439        },
440        TimeSeries {
441            labels: vec![
442                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
443                new_label("instance".to_string(), "test_host1".to_string()),
444                new_label("idc".to_string(), "z001".to_string()),
445            ],
446            samples: vec![
447                Sample {
448                    value: 3.0f64,
449                    timestamp: 1000,
450                },
451                Sample {
452                    value: 4.0f64,
453                    timestamp: 2000,
454                },
455            ],
456            ..Default::default()
457        },
458        TimeSeries {
459            labels: vec![
460                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
461                new_label("idc".to_string(), "z002".to_string()),
462                new_label("app".to_string(), "biz".to_string()),
463            ],
464            samples: vec![
465                Sample {
466                    value: 5.0f64,
467                    timestamp: 1000,
468                },
469                Sample {
470                    value: 6.0f64,
471                    timestamp: 2000,
472                },
473                Sample {
474                    value: 7.0f64,
475                    timestamp: 3000,
476                },
477            ],
478            ..Default::default()
479        },
480    ]
481}
482
483/// Add new labels to the mock timeseries.
484pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
485    let ts_demo_metrics = TimeSeries {
486        labels: vec![
487            new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
488            new_label("idc".to_string(), "idc3".to_string()),
489            new_label("new_label1".to_string(), "foo".to_string()),
490        ],
491        samples: vec![Sample {
492            value: 42.0,
493            timestamp: 3000,
494        }],
495        ..Default::default()
496    };
497    let ts_multi_labels = TimeSeries {
498        labels: vec![
499            new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
500            new_label("idc".to_string(), "idc4".to_string()),
501            new_label("env".to_string(), "prod".to_string()),
502            new_label("host".to_string(), "host9".to_string()),
503            new_label("new_label2".to_string(), "bar".to_string()),
504        ],
505        samples: vec![Sample {
506            value: 99.0,
507            timestamp: 4000,
508        }],
509        ..Default::default()
510    };
511
512    vec![ts_demo_metrics, ts_multi_labels]
513}
514
515/// Add new labels to the mock timeseries.
516pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
517    let idc3_schema = TimeSeries {
518        labels: vec![
519            new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
520            new_label("__database__".to_string(), "idc3".to_string()),
521            new_label("__physical_table__".to_string(), "f1".to_string()),
522        ],
523        samples: vec![Sample {
524            value: 42.0,
525            timestamp: 3000,
526        }],
527        ..Default::default()
528    };
529    let idc4_schema = TimeSeries {
530        labels: vec![
531            new_label(
532                METRIC_NAME_LABEL.to_string(),
533                "idc4_local_table".to_string(),
534            ),
535            new_label("__database__".to_string(), "idc4".to_string()),
536            new_label("__physical_table__".to_string(), "f2".to_string()),
537        ],
538        samples: vec![Sample {
539            value: 99.0,
540            timestamp: 4000,
541        }],
542        ..Default::default()
543    };
544
545    vec![idc3_schema, idc4_schema]
546}
547
548#[cfg(test)]
549mod tests {
550    use std::sync::Arc;
551
552    use api::prom_store::remote::LabelMatcher;
553    use api::v1::{ColumnDataType, Row, SemanticType};
554    use datafusion::prelude::SessionContext;
555    use datatypes::schema::{ColumnSchema, Schema};
556    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
557    use table::table::adapter::DfTableProviderAdapter;
558    use table::test_util::MemTable;
559
560    use super::*;
561
562    const EQ_TYPE: i32 = MatcherType::Eq as i32;
563    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
564    const RE_TYPE: i32 = MatcherType::Re as i32;
565
566    #[test]
567    fn test_table_name() {
568        let q = Query {
569            start_timestamp_ms: 1000,
570            end_timestamp_ms: 2000,
571            matchers: vec![],
572            ..Default::default()
573        };
574        let err = table_name(&q).unwrap_err();
575        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
576
577        let q = Query {
578            start_timestamp_ms: 1000,
579            end_timestamp_ms: 2000,
580            matchers: vec![LabelMatcher {
581                name: METRIC_NAME_LABEL.to_string(),
582                value: "test".to_string(),
583                r#type: EQ_TYPE,
584            }],
585            ..Default::default()
586        };
587        assert_eq!("test", table_name(&q).unwrap());
588    }
589
590    #[test]
591    fn test_query_to_plan() {
592        let q = Query {
593            start_timestamp_ms: 1000,
594            end_timestamp_ms: 2000,
595            matchers: vec![LabelMatcher {
596                name: METRIC_NAME_LABEL.to_string(),
597                value: "test".to_string(),
598                r#type: EQ_TYPE,
599            }],
600            ..Default::default()
601        };
602
603        let schema = Arc::new(Schema::new(vec![
604            ColumnSchema::new(
605                GREPTIME_TIMESTAMP,
606                ConcreteDataType::timestamp_millisecond_datatype(),
607                true,
608            ),
609            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
610            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
611            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
612        ]));
613        let recordbatch = RecordBatch::new(
614            schema,
615            vec![
616                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
617                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
618                Arc::new(StringVector::from(vec!["host1"])) as _,
619                Arc::new(StringVector::from(vec!["job"])) as _,
620            ],
621        )
622        .unwrap();
623
624        let ctx = SessionContext::new();
625        let table = MemTable::table("test", recordbatch);
626        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
627
628        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
629        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
630        let display_string = format!("{}", plan.display_indent());
631
632        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n  TableScan: ?table?", display_string);
633
634        let q = Query {
635            start_timestamp_ms: 1000,
636            end_timestamp_ms: 2000,
637            matchers: vec![
638                LabelMatcher {
639                    name: METRIC_NAME_LABEL.to_string(),
640                    value: "test".to_string(),
641                    r#type: EQ_TYPE,
642                },
643                LabelMatcher {
644                    name: "job".to_string(),
645                    value: "*prom*".to_string(),
646                    r#type: RE_TYPE,
647                },
648                LabelMatcher {
649                    name: "instance".to_string(),
650                    value: "localhost".to_string(),
651                    r#type: NEQ_TYPE,
652                },
653            ],
654            ..Default::default()
655        };
656
657        let dataframe = ctx.read_table(table_provider).unwrap();
658        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
659        let display_string = format!("{}", plan.display_indent());
660
661        assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?", display_string);
662    }
663
664    fn column_schemas_with(
665        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
666    ) -> Vec<api::v1::ColumnSchema> {
667        kts_iter.push((
668            "greptime_value",
669            ColumnDataType::Float64,
670            SemanticType::Field,
671        ));
672        kts_iter.push((
673            "greptime_timestamp",
674            ColumnDataType::TimestampMillisecond,
675            SemanticType::Timestamp,
676        ));
677
678        kts_iter
679            .into_iter()
680            .map(|(k, t, s)| api::v1::ColumnSchema {
681                column_name: k.to_string(),
682                datatype: t as i32,
683                semantic_type: s as i32,
684                ..Default::default()
685            })
686            .collect()
687    }
688
689    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
690        Row {
691            values: vec![
692                api::v1::Value {
693                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
694                },
695                api::v1::Value {
696                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
697                },
698                api::v1::Value {
699                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
700                        timestamp,
701                    )),
702                },
703            ],
704        }
705    }
706
707    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
708        Row {
709            values: vec![
710                api::v1::Value {
711                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
712                },
713                api::v1::Value {
714                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
715                },
716                api::v1::Value {
717                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
718                },
719                api::v1::Value {
720                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
721                        timestamp,
722                    )),
723                },
724            ],
725        }
726    }
727
728    #[test]
729    fn test_write_request_to_row_insert_exprs() {
730        let write_request = WriteRequest {
731            timeseries: mock_timeseries(),
732            ..Default::default()
733        };
734
735        let mut exprs = to_grpc_row_insert_requests(&write_request)
736            .unwrap()
737            .0
738            .inserts;
739        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
740        assert_eq!(3, exprs.len());
741        assert_eq!("metric1", exprs[0].table_name);
742        assert_eq!("metric2", exprs[1].table_name);
743        assert_eq!("metric3", exprs[2].table_name);
744
745        let rows = exprs[0].rows.as_ref().unwrap();
746        let schema = &rows.schema;
747        let rows = &rows.rows;
748        assert_eq!(2, rows.len());
749        assert_eq!(3, schema.len());
750        assert_eq!(
751            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
752            *schema
753        );
754        assert_eq!(
755            &vec![
756                make_row_with_label("spark", 1.0, 1000),
757                make_row_with_label("spark", 2.0, 2000),
758            ],
759            rows
760        );
761
762        let rows = exprs[1].rows.as_ref().unwrap();
763        let schema = &rows.schema;
764        let rows = &rows.rows;
765        assert_eq!(2, rows.len());
766        assert_eq!(4, schema.len());
767        assert_eq!(
768            column_schemas_with(vec![
769                ("instance", ColumnDataType::String, SemanticType::Tag),
770                ("idc", ColumnDataType::String, SemanticType::Tag)
771            ]),
772            *schema
773        );
774        assert_eq!(
775            &vec![
776                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
777                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
778            ],
779            rows
780        );
781
782        let rows = exprs[2].rows.as_ref().unwrap();
783        let schema = &rows.schema;
784        let rows = &rows.rows;
785        assert_eq!(3, rows.len());
786        assert_eq!(4, schema.len());
787        assert_eq!(
788            column_schemas_with(vec![
789                ("idc", ColumnDataType::String, SemanticType::Tag),
790                ("app", ColumnDataType::String, SemanticType::Tag)
791            ]),
792            *schema
793        );
794        assert_eq!(
795            &vec![
796                make_row_with_2_labels("z002", "biz", 5.0, 1000),
797                make_row_with_2_labels("z002", "biz", 6.0, 2000),
798                make_row_with_2_labels("z002", "biz", 7.0, 3000),
799            ],
800            rows
801        );
802    }
803
804    #[test]
805    fn test_recordbatches_to_timeseries() {
806        let schema = Arc::new(Schema::new(vec![
807            ColumnSchema::new(
808                GREPTIME_TIMESTAMP,
809                ConcreteDataType::timestamp_millisecond_datatype(),
810                true,
811            ),
812            ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
813            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
814        ]));
815
816        let recordbatches = RecordBatches::try_new(
817            schema.clone(),
818            vec![
819                RecordBatch::new(
820                    schema.clone(),
821                    vec![
822                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
823                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
824                        Arc::new(StringVector::from(vec!["host1"])) as _,
825                    ],
826                )
827                .unwrap(),
828                RecordBatch::new(
829                    schema,
830                    vec![
831                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
832                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
833                        Arc::new(StringVector::from(vec!["host2"])) as _,
834                    ],
835                )
836                .unwrap(),
837            ],
838        )
839        .unwrap();
840
841        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
842        assert_eq!(2, timeseries.len());
843
844        assert_eq!(
845            vec![
846                Label {
847                    name: METRIC_NAME_LABEL.to_string(),
848                    value: "metric1".to_string(),
849                },
850                Label {
851                    name: "instance".to_string(),
852                    value: "host1".to_string(),
853                },
854            ],
855            timeseries[0].labels
856        );
857
858        assert_eq!(
859            timeseries[0].samples,
860            vec![Sample {
861                value: 3.0,
862                timestamp: 1000,
863            }]
864        );
865
866        assert_eq!(
867            vec![
868                Label {
869                    name: METRIC_NAME_LABEL.to_string(),
870                    value: "metric1".to_string(),
871                },
872                Label {
873                    name: "instance".to_string(),
874                    value: "host2".to_string(),
875                },
876            ],
877            timeseries[1].labels
878        );
879        assert_eq!(
880            timeseries[1].samples,
881            vec![Sample {
882                value: 7.0,
883                timestamp: 2000,
884            }]
885        );
886    }
887}