servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{greptime_timestamp, greptime_value};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{Expr, col, lit, regexp_match};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{OptionExt, ResultExt, ensure};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
54pub const FIELD_NAME_LABEL: &str = "__field__";
55
56/// Metrics for push gateway protocol
57pub struct Metrics {
58    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61/// Get table name from remote query
62pub fn table_name(q: &Query) -> Result<String> {
63    let label_matches = &q.matchers;
64
65    label_matches
66        .iter()
67        .find_map(|m| {
68            if m.name == METRIC_NAME_LABEL {
69                Some(m.value.clone())
70            } else {
71                None
72            }
73        })
74        .context(error::InvalidPromRemoteRequestSnafu {
75            msg: "missing '__name__' label in timeseries",
76        })
77}
78
79/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
80/// Prioritizes __schema__ over __database__ labels.
81pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82    for query in &request.queries {
83        for matcher in &query.matchers {
84            if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85                return Some(matcher.value.clone());
86            }
87        }
88    }
89
90    // If no __schema__ found, look for __database__
91    for query in &request.queries {
92        for matcher in &query.matchers {
93            if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94                return Some(matcher.value.clone());
95            }
96        }
97    }
98
99    None
100}
101
102/// Create a DataFrame from a remote Query
103#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105    let DataFrame::DataFusion(dataframe) = dataframe;
106
107    let start_timestamp_ms = q.start_timestamp_ms;
108    let end_timestamp_ms = q.end_timestamp_ms;
109
110    let label_matches = &q.matchers;
111
112    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
113
114    conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
115    conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
116
117    for m in label_matches {
118        let name = &m.name;
119
120        if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
121            continue;
122        }
123
124        let value = &m.value;
125        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
126            error::InvalidPromRemoteRequestSnafu {
127                msg: format!("invalid LabelMatcher type, decode error: {e}",),
128            }
129            .build()
130        })?;
131
132        match m_type {
133            MatcherType::Eq => {
134                conditions.push(col(name).eq(lit(value)));
135            }
136            MatcherType::Neq => {
137                conditions.push(col(name).not_eq(lit(value)));
138            }
139            // Case sensitive regexp match
140            MatcherType::Re => {
141                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
142            }
143            // Case sensitive regexp not match
144            MatcherType::Nre => {
145                conditions.push(regexp_match(col(name), lit(value), None).is_null());
146            }
147        }
148    }
149
150    // Safety: conditions MUST not be empty, reduce always return Some(expr).
151    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
152
153    let dataframe = dataframe
154        .filter(conditions)
155        .context(error::DataFrameSnafu)?;
156
157    Ok(dataframe.into_parts().1)
158}
159
160#[inline]
161fn new_label(name: String, value: String) -> Label {
162    Label { name, value }
163}
164
165fn lit_timestamp_millisecond(ts: i64) -> Expr {
166    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
167}
168
169// A timeseries id
170#[derive(Debug)]
171struct TimeSeriesId {
172    labels: Vec<Label>,
173}
174
175/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
176impl PartialEq for TimeSeriesId {
177    fn eq(&self, other: &Self) -> bool {
178        if self.labels.len() != other.labels.len() {
179            return false;
180        }
181
182        self.labels
183            .iter()
184            .zip(other.labels.iter())
185            .all(|(l, r)| l.name == r.name && l.value == r.value)
186    }
187}
188impl Eq for TimeSeriesId {}
189
190impl Hash for TimeSeriesId {
191    fn hash<H: Hasher>(&self, state: &mut H) {
192        for label in &self.labels {
193            label.name.hash(state);
194            label.value.hash(state);
195        }
196    }
197}
198
199/// For Sorting timeseries
200impl Ord for TimeSeriesId {
201    fn cmp(&self, other: &Self) -> Ordering {
202        let ordering = self.labels.len().cmp(&other.labels.len());
203        if ordering != Ordering::Equal {
204            return ordering;
205        }
206
207        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
208            let ordering = l.name.cmp(&r.name);
209
210            if ordering != Ordering::Equal {
211                return ordering;
212            }
213
214            let ordering = l.value.cmp(&r.value);
215
216            if ordering != Ordering::Equal {
217                return ordering;
218            }
219        }
220        Ordering::Equal
221    }
222}
223
224impl PartialOrd for TimeSeriesId {
225    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
226        Some(self.cmp(other))
227    }
228}
229
230/// Collect each row's timeseries id
231/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
232fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
233    let row_count = recordbatch.num_rows();
234    let mut timeseries_ids = Vec::with_capacity(row_count);
235
236    for row in 0..row_count {
237        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
238        labels.push(new_label(
239            METRIC_NAME_LABEL.to_string(),
240            table_name.to_string(),
241        ));
242
243        for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
244            if column_schema.name == greptime_value() || column_schema.name == greptime_timestamp()
245            {
246                continue;
247            }
248
249            let column = &recordbatch.columns()[i];
250            // A label with an empty label value is considered equivalent to a label that does not exist.
251            if column.is_null(row) {
252                continue;
253            }
254
255            let value = column.get(row).to_string();
256            labels.push(new_label(column_schema.name.clone(), value));
257        }
258        timeseries_ids.push(TimeSeriesId { labels });
259    }
260    timeseries_ids
261}
262
263pub fn recordbatches_to_timeseries(
264    table_name: &str,
265    recordbatches: RecordBatches,
266) -> Result<Vec<TimeSeries>> {
267    Ok(recordbatches
268        .take()
269        .into_iter()
270        .map(|x| recordbatch_to_timeseries(table_name, x))
271        .collect::<Result<Vec<_>>>()?
272        .into_iter()
273        .flatten()
274        .collect())
275}
276
277fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
278    let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
279        error::InvalidPromRemoteReadQueryResultSnafu {
280            msg: "missing greptime_timestamp column in query result",
281        },
282    )?;
283    ensure!(
284        ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
285        error::InvalidPromRemoteReadQueryResultSnafu {
286            msg: format!(
287                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
288                ts_column.data_type()
289            )
290        }
291    );
292
293    let field_column = recordbatch.column_by_name(greptime_value()).context(
294        error::InvalidPromRemoteReadQueryResultSnafu {
295            msg: "missing greptime_value column in query result",
296        },
297    )?;
298    ensure!(
299        field_column.data_type() == ConcreteDataType::float64_datatype(),
300        error::InvalidPromRemoteReadQueryResultSnafu {
301            msg: format!(
302                "Expect value column of datatype Float64, actual {:?}",
303                field_column.data_type()
304            )
305        }
306    );
307
308    // First, collect each row's timeseries id
309    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
310    // Then, group timeseries by it's id.
311    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
312
313    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
314        let timeseries = timeseries_map
315            .entry(timeseries_id)
316            .or_insert_with(|| TimeSeries {
317                labels: timeseries_id.labels.clone(),
318                ..Default::default()
319            });
320
321        if ts_column.is_null(row) || field_column.is_null(row) {
322            continue;
323        }
324
325        let value: f64 = match field_column.get(row) {
326            Value::Float64(value) => value.into(),
327            _ => unreachable!("checked by the \"ensure\" above"),
328        };
329        let timestamp = match ts_column.get(row) {
330            Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
331            _ => unreachable!("checked by the \"ensure\" above"),
332        };
333        let sample = Sample { value, timestamp };
334
335        timeseries.samples.push(sample);
336    }
337
338    Ok(timeseries_map.into_values().collect())
339}
340
341pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
342    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
343
344    let mut multi_table_data = MultiTableData::new();
345
346    for series in &request.timeseries {
347        let table_name = &series
348            .labels
349            .iter()
350            .find(|label| {
351                // The metric name is a special label
352                label.name == METRIC_NAME_LABEL
353            })
354            .context(error::InvalidPromRemoteRequestSnafu {
355                msg: "missing '__name__' label in time-series",
356            })?
357            .value;
358
359        // The metric name is a special label,
360        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
361        let num_columns = series.labels.len() + 1;
362
363        let table_data = multi_table_data.get_or_default_table_data(
364            table_name,
365            num_columns,
366            series.samples.len(),
367        );
368
369        // labels
370        let kvs = series.labels.iter().filter_map(|label| {
371            if label.name == METRIC_NAME_LABEL {
372                None
373            } else {
374                Some((label.name.clone(), label.value.clone()))
375            }
376        });
377
378        if series.samples.len() == 1 {
379            let mut one_row = table_data.alloc_one_row();
380
381            row_writer::write_tags(table_data, kvs, &mut one_row)?;
382            // value
383            row_writer::write_f64(
384                table_data,
385                greptime_value(),
386                series.samples[0].value,
387                &mut one_row,
388            )?;
389            // timestamp
390            row_writer::write_ts_to_millis(
391                table_data,
392                greptime_timestamp(),
393                Some(series.samples[0].timestamp),
394                Precision::Millisecond,
395                &mut one_row,
396            )?;
397
398            table_data.add_row(one_row);
399        } else {
400            for Sample { value, timestamp } in &series.samples {
401                let mut one_row = table_data.alloc_one_row();
402
403                // labels
404                let kvs = kvs.clone();
405                row_writer::write_tags(table_data, kvs, &mut one_row)?;
406                // value
407                row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
408                // timestamp
409                row_writer::write_ts_to_millis(
410                    table_data,
411                    greptime_timestamp(),
412                    Some(*timestamp),
413                    Precision::Millisecond,
414                    &mut one_row,
415                )?;
416
417                table_data.add_row(one_row);
418            }
419        }
420    }
421
422    Ok(multi_table_data.into_row_insert_requests())
423}
424
425#[inline]
426pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
427    let mut decoder = Decoder::new();
428    decoder
429        .decompress_vec(buf)
430        .context(error::DecompressSnappyPromRemoteRequestSnafu)
431}
432
433#[inline]
434pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
435    let mut encoder = Encoder::new();
436    encoder
437        .compress_vec(buf)
438        .context(error::CompressPromRemoteRequestSnafu)
439}
440
441#[inline]
442pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
443    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
444}
445
446/// Mock timeseries for test, it is both used in servers and frontend crate
447/// So we present it here
448pub fn mock_timeseries() -> Vec<TimeSeries> {
449    vec![
450        TimeSeries {
451            labels: vec![
452                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
453                new_label("job".to_string(), "spark".to_string()),
454            ],
455            samples: vec![
456                Sample {
457                    value: 1.0f64,
458                    timestamp: 1000,
459                },
460                Sample {
461                    value: 2.0f64,
462                    timestamp: 2000,
463                },
464            ],
465            ..Default::default()
466        },
467        TimeSeries {
468            labels: vec![
469                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
470                new_label("instance".to_string(), "test_host1".to_string()),
471                new_label("idc".to_string(), "z001".to_string()),
472            ],
473            samples: vec![
474                Sample {
475                    value: 3.0f64,
476                    timestamp: 1000,
477                },
478                Sample {
479                    value: 4.0f64,
480                    timestamp: 2000,
481                },
482            ],
483            ..Default::default()
484        },
485        TimeSeries {
486            labels: vec![
487                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
488                new_label("idc".to_string(), "z002".to_string()),
489                new_label("app".to_string(), "biz".to_string()),
490            ],
491            samples: vec![
492                Sample {
493                    value: 5.0f64,
494                    timestamp: 1000,
495                },
496                Sample {
497                    value: 6.0f64,
498                    timestamp: 2000,
499                },
500                Sample {
501                    value: 7.0f64,
502                    timestamp: 3000,
503                },
504            ],
505            ..Default::default()
506        },
507    ]
508}
509
510/// Add new labels to the mock timeseries.
511pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
512    let ts_demo_metrics = TimeSeries {
513        labels: vec![
514            new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
515            new_label("idc".to_string(), "idc3".to_string()),
516            new_label("new_label1".to_string(), "foo".to_string()),
517        ],
518        samples: vec![Sample {
519            value: 42.0,
520            timestamp: 3000,
521        }],
522        ..Default::default()
523    };
524    let ts_multi_labels = TimeSeries {
525        labels: vec![
526            new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
527            new_label("idc".to_string(), "idc4".to_string()),
528            new_label("env".to_string(), "prod".to_string()),
529            new_label("host".to_string(), "host9".to_string()),
530            new_label("new_label2".to_string(), "bar".to_string()),
531        ],
532        samples: vec![Sample {
533            value: 99.0,
534            timestamp: 4000,
535        }],
536        ..Default::default()
537    };
538
539    vec![ts_demo_metrics, ts_multi_labels]
540}
541
542/// Add new labels to the mock timeseries.
543pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
544    let idc3_schema = TimeSeries {
545        labels: vec![
546            new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
547            new_label("__database__".to_string(), "idc3".to_string()),
548            new_label("__physical_table__".to_string(), "f1".to_string()),
549        ],
550        samples: vec![Sample {
551            value: 42.0,
552            timestamp: 3000,
553        }],
554        ..Default::default()
555    };
556    let idc4_schema = TimeSeries {
557        labels: vec![
558            new_label(
559                METRIC_NAME_LABEL.to_string(),
560                "idc4_local_table".to_string(),
561            ),
562            new_label("__database__".to_string(), "idc4".to_string()),
563            new_label("__physical_table__".to_string(), "f2".to_string()),
564        ],
565        samples: vec![Sample {
566            value: 99.0,
567            timestamp: 4000,
568        }],
569        ..Default::default()
570    };
571
572    vec![idc3_schema, idc4_schema]
573}
574
575#[cfg(test)]
576mod tests {
577    use std::sync::Arc;
578
579    use api::prom_store::remote::LabelMatcher;
580    use api::v1::{ColumnDataType, Row, SemanticType};
581    use datafusion::prelude::SessionContext;
582    use datatypes::schema::{ColumnSchema, Schema};
583    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
584    use table::table::adapter::DfTableProviderAdapter;
585    use table::test_util::MemTable;
586
587    use super::*;
588
589    const EQ_TYPE: i32 = MatcherType::Eq as i32;
590    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
591    const RE_TYPE: i32 = MatcherType::Re as i32;
592
593    #[test]
594    fn test_table_name() {
595        let q = Query {
596            start_timestamp_ms: 1000,
597            end_timestamp_ms: 2000,
598            matchers: vec![],
599            ..Default::default()
600        };
601        let err = table_name(&q).unwrap_err();
602        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
603
604        let q = Query {
605            start_timestamp_ms: 1000,
606            end_timestamp_ms: 2000,
607            matchers: vec![LabelMatcher {
608                name: METRIC_NAME_LABEL.to_string(),
609                value: "test".to_string(),
610                r#type: EQ_TYPE,
611            }],
612            ..Default::default()
613        };
614        assert_eq!("test", table_name(&q).unwrap());
615    }
616
617    #[test]
618    fn test_query_to_plan() {
619        let q = Query {
620            start_timestamp_ms: 1000,
621            end_timestamp_ms: 2000,
622            matchers: vec![LabelMatcher {
623                name: METRIC_NAME_LABEL.to_string(),
624                value: "test".to_string(),
625                r#type: EQ_TYPE,
626            }],
627            ..Default::default()
628        };
629
630        let schema = Arc::new(Schema::new(vec![
631            ColumnSchema::new(
632                greptime_timestamp(),
633                ConcreteDataType::timestamp_millisecond_datatype(),
634                true,
635            ),
636            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
637            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
638            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
639        ]));
640        let recordbatch = RecordBatch::new(
641            schema,
642            vec![
643                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
644                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
645                Arc::new(StringVector::from(vec!["host1"])) as _,
646                Arc::new(StringVector::from(vec!["job"])) as _,
647            ],
648        )
649        .unwrap();
650
651        let ctx = SessionContext::new();
652        let table = MemTable::table("test", recordbatch);
653        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
654
655        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
656        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
657        let display_string = format!("{}", plan.display_indent());
658
659        let ts_col = greptime_timestamp();
660        let expected = format!(
661            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n  TableScan: ?table?",
662            ts_col, ts_col
663        );
664        assert_eq!(expected, display_string);
665
666        let q = Query {
667            start_timestamp_ms: 1000,
668            end_timestamp_ms: 2000,
669            matchers: vec![
670                LabelMatcher {
671                    name: METRIC_NAME_LABEL.to_string(),
672                    value: "test".to_string(),
673                    r#type: EQ_TYPE,
674                },
675                LabelMatcher {
676                    name: "job".to_string(),
677                    value: "*prom*".to_string(),
678                    r#type: RE_TYPE,
679                },
680                LabelMatcher {
681                    name: "instance".to_string(),
682                    value: "localhost".to_string(),
683                    r#type: NEQ_TYPE,
684                },
685            ],
686            ..Default::default()
687        };
688
689        let dataframe = ctx.read_table(table_provider).unwrap();
690        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
691        let display_string = format!("{}", plan.display_indent());
692
693        let ts_col = greptime_timestamp();
694        let expected = format!(
695            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?",
696            ts_col, ts_col
697        );
698        assert_eq!(expected, display_string);
699    }
700
701    fn column_schemas_with(
702        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
703    ) -> Vec<api::v1::ColumnSchema> {
704        kts_iter.push((
705            greptime_value(),
706            ColumnDataType::Float64,
707            SemanticType::Field,
708        ));
709        kts_iter.push((
710            greptime_timestamp(),
711            ColumnDataType::TimestampMillisecond,
712            SemanticType::Timestamp,
713        ));
714
715        kts_iter
716            .into_iter()
717            .map(|(k, t, s)| api::v1::ColumnSchema {
718                column_name: k.to_string(),
719                datatype: t as i32,
720                semantic_type: s as i32,
721                ..Default::default()
722            })
723            .collect()
724    }
725
726    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
727        Row {
728            values: vec![
729                api::v1::Value {
730                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
731                },
732                api::v1::Value {
733                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
734                },
735                api::v1::Value {
736                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
737                        timestamp,
738                    )),
739                },
740            ],
741        }
742    }
743
744    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
745        Row {
746            values: vec![
747                api::v1::Value {
748                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
749                },
750                api::v1::Value {
751                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
752                },
753                api::v1::Value {
754                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
755                },
756                api::v1::Value {
757                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
758                        timestamp,
759                    )),
760                },
761            ],
762        }
763    }
764
765    #[test]
766    fn test_write_request_to_row_insert_exprs() {
767        let write_request = WriteRequest {
768            timeseries: mock_timeseries(),
769            ..Default::default()
770        };
771
772        let mut exprs = to_grpc_row_insert_requests(&write_request)
773            .unwrap()
774            .0
775            .inserts;
776        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
777        assert_eq!(3, exprs.len());
778        assert_eq!("metric1", exprs[0].table_name);
779        assert_eq!("metric2", exprs[1].table_name);
780        assert_eq!("metric3", exprs[2].table_name);
781
782        let rows = exprs[0].rows.as_ref().unwrap();
783        let schema = &rows.schema;
784        let rows = &rows.rows;
785        assert_eq!(2, rows.len());
786        assert_eq!(3, schema.len());
787        assert_eq!(
788            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
789            *schema
790        );
791        assert_eq!(
792            &vec![
793                make_row_with_label("spark", 1.0, 1000),
794                make_row_with_label("spark", 2.0, 2000),
795            ],
796            rows
797        );
798
799        let rows = exprs[1].rows.as_ref().unwrap();
800        let schema = &rows.schema;
801        let rows = &rows.rows;
802        assert_eq!(2, rows.len());
803        assert_eq!(4, schema.len());
804        assert_eq!(
805            column_schemas_with(vec![
806                ("instance", ColumnDataType::String, SemanticType::Tag),
807                ("idc", ColumnDataType::String, SemanticType::Tag)
808            ]),
809            *schema
810        );
811        assert_eq!(
812            &vec![
813                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
814                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
815            ],
816            rows
817        );
818
819        let rows = exprs[2].rows.as_ref().unwrap();
820        let schema = &rows.schema;
821        let rows = &rows.rows;
822        assert_eq!(3, rows.len());
823        assert_eq!(4, schema.len());
824        assert_eq!(
825            column_schemas_with(vec![
826                ("idc", ColumnDataType::String, SemanticType::Tag),
827                ("app", ColumnDataType::String, SemanticType::Tag)
828            ]),
829            *schema
830        );
831        assert_eq!(
832            &vec![
833                make_row_with_2_labels("z002", "biz", 5.0, 1000),
834                make_row_with_2_labels("z002", "biz", 6.0, 2000),
835                make_row_with_2_labels("z002", "biz", 7.0, 3000),
836            ],
837            rows
838        );
839    }
840
841    #[test]
842    fn test_recordbatches_to_timeseries() {
843        let schema = Arc::new(Schema::new(vec![
844            ColumnSchema::new(
845                greptime_timestamp(),
846                ConcreteDataType::timestamp_millisecond_datatype(),
847                true,
848            ),
849            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
850            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
851        ]));
852
853        let recordbatches = RecordBatches::try_new(
854            schema.clone(),
855            vec![
856                RecordBatch::new(
857                    schema.clone(),
858                    vec![
859                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
860                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
861                        Arc::new(StringVector::from(vec!["host1"])) as _,
862                    ],
863                )
864                .unwrap(),
865                RecordBatch::new(
866                    schema,
867                    vec![
868                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
869                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
870                        Arc::new(StringVector::from(vec!["host2"])) as _,
871                    ],
872                )
873                .unwrap(),
874            ],
875        )
876        .unwrap();
877
878        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
879        assert_eq!(2, timeseries.len());
880
881        assert_eq!(
882            vec![
883                Label {
884                    name: METRIC_NAME_LABEL.to_string(),
885                    value: "metric1".to_string(),
886                },
887                Label {
888                    name: "instance".to_string(),
889                    value: "host1".to_string(),
890                },
891            ],
892            timeseries[0].labels
893        );
894
895        assert_eq!(
896            timeseries[0].samples,
897            vec![Sample {
898                value: 3.0,
899                timestamp: 1000,
900            }]
901        );
902
903        assert_eq!(
904            vec![
905                Label {
906                    name: METRIC_NAME_LABEL.to_string(),
907                    value: "metric1".to_string(),
908                },
909                Label {
910                    name: "instance".to_string(),
911                    value: "host2".to_string(),
912                },
913            ],
914            timeseries[1].labels
915        );
916        assert_eq!(
917            timeseries[1].samples,
918            vec![Sample {
919                value: 7.0,
920                timestamp: 2000,
921            }]
922        );
923    }
924}