servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use arrow::array::{Array, AsArray};
25use arrow::datatypes::{Float64Type, TimestampMillisecondType};
26use common_grpc::precision::Precision;
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_recordbatch::{RecordBatch, RecordBatches};
29use common_telemetry::tracing;
30use datafusion::prelude::{Expr, col, lit, regexp_match};
31use datafusion_common::ScalarValue;
32use datafusion_expr::LogicalPlan;
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
54pub const FIELD_NAME_LABEL: &str = "__field__";
55
56/// Metrics for push gateway protocol
57pub struct Metrics {
58    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61/// Get table name from remote query
62pub fn table_name(q: &Query) -> Result<String> {
63    let label_matches = &q.matchers;
64
65    label_matches
66        .iter()
67        .find_map(|m| {
68            if m.name == METRIC_NAME_LABEL {
69                Some(m.value.clone())
70            } else {
71                None
72            }
73        })
74        .context(error::InvalidPromRemoteRequestSnafu {
75            msg: "missing '__name__' label in timeseries",
76        })
77}
78
79/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
80/// Prioritizes __schema__ over __database__ labels.
81pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82    for query in &request.queries {
83        for matcher in &query.matchers {
84            if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85                return Some(matcher.value.clone());
86            }
87        }
88    }
89
90    // If no __schema__ found, look for __database__
91    for query in &request.queries {
92        for matcher in &query.matchers {
93            if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94                return Some(matcher.value.clone());
95            }
96        }
97    }
98
99    None
100}
101
102/// Create a DataFrame from a remote Query
103#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105    let DataFrame::DataFusion(dataframe) = dataframe;
106
107    let start_timestamp_ms = q.start_timestamp_ms;
108    let end_timestamp_ms = q.end_timestamp_ms;
109
110    let label_matches = &q.matchers;
111
112    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
113
114    conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
115    conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
116
117    for m in label_matches {
118        let name = &m.name;
119
120        if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
121            continue;
122        }
123
124        let value = &m.value;
125        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
126            error::InvalidPromRemoteRequestSnafu {
127                msg: format!("invalid LabelMatcher type, decode error: {e}",),
128            }
129            .build()
130        })?;
131
132        match m_type {
133            MatcherType::Eq => {
134                conditions.push(col(name).eq(lit(value)));
135            }
136            MatcherType::Neq => {
137                conditions.push(col(name).not_eq(lit(value)));
138            }
139            // Case sensitive regexp match
140            MatcherType::Re => {
141                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
142            }
143            // Case sensitive regexp not match
144            MatcherType::Nre => {
145                conditions.push(regexp_match(col(name), lit(value), None).is_null());
146            }
147        }
148    }
149
150    // Safety: conditions MUST not be empty, reduce always return Some(expr).
151    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
152
153    let dataframe = dataframe
154        .filter(conditions)
155        .context(error::DataFrameSnafu)?;
156
157    Ok(dataframe.into_parts().1)
158}
159
160#[inline]
161fn new_label(name: String, value: String) -> Label {
162    Label { name, value }
163}
164
165fn lit_timestamp_millisecond(ts: i64) -> Expr {
166    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
167}
168
169// A timeseries id
170#[derive(Debug)]
171struct TimeSeriesId {
172    labels: Vec<Label>,
173}
174
175/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
176impl PartialEq for TimeSeriesId {
177    fn eq(&self, other: &Self) -> bool {
178        if self.labels.len() != other.labels.len() {
179            return false;
180        }
181
182        self.labels
183            .iter()
184            .zip(other.labels.iter())
185            .all(|(l, r)| l.name == r.name && l.value == r.value)
186    }
187}
188impl Eq for TimeSeriesId {}
189
190impl Hash for TimeSeriesId {
191    fn hash<H: Hasher>(&self, state: &mut H) {
192        for label in &self.labels {
193            label.name.hash(state);
194            label.value.hash(state);
195        }
196    }
197}
198
199/// For Sorting timeseries
200impl Ord for TimeSeriesId {
201    fn cmp(&self, other: &Self) -> Ordering {
202        let ordering = self.labels.len().cmp(&other.labels.len());
203        if ordering != Ordering::Equal {
204            return ordering;
205        }
206
207        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
208            let ordering = l.name.cmp(&r.name);
209
210            if ordering != Ordering::Equal {
211                return ordering;
212            }
213
214            let ordering = l.value.cmp(&r.value);
215
216            if ordering != Ordering::Equal {
217                return ordering;
218            }
219        }
220        Ordering::Equal
221    }
222}
223
224impl PartialOrd for TimeSeriesId {
225    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
226        Some(self.cmp(other))
227    }
228}
229
230/// Collect each row's timeseries id
231/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
232fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
233    let row_count = recordbatch.num_rows();
234    let mut timeseries_ids = Vec::with_capacity(row_count);
235
236    let column_names = recordbatch
237        .schema
238        .column_schemas()
239        .iter()
240        .map(|column_schema| &column_schema.name);
241    let columns = column_names
242        .enumerate()
243        .filter(|(_, column_name)| {
244            *column_name != greptime_timestamp() && *column_name != greptime_value()
245        })
246        .map(|(i, column_name)| {
247            (
248                column_name,
249                recordbatch.iter_column_as_string(i).collect::<Vec<_>>(),
250            )
251        })
252        .collect::<Vec<_>>();
253
254    for row in 0..row_count {
255        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
256        labels.push(new_label(
257            METRIC_NAME_LABEL.to_string(),
258            table_name.to_string(),
259        ));
260
261        for (column_name, column_values) in columns.iter() {
262            if let Some(value) = &column_values[row] {
263                labels.push(new_label((*column_name).clone(), value.clone()));
264            }
265        }
266        timeseries_ids.push(TimeSeriesId { labels });
267    }
268    timeseries_ids
269}
270
271pub fn recordbatches_to_timeseries(
272    table_name: &str,
273    recordbatches: RecordBatches,
274) -> Result<Vec<TimeSeries>> {
275    Ok(recordbatches
276        .take()
277        .into_iter()
278        .map(|x| recordbatch_to_timeseries(table_name, x))
279        .collect::<Result<Vec<_>>>()?
280        .into_iter()
281        .flatten()
282        .collect())
283}
284
285fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
286    let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
287        error::InvalidPromRemoteReadQueryResultSnafu {
288            msg: "missing greptime_timestamp column in query result",
289        },
290    )?;
291    let ts_column = ts_column
292        .as_primitive_opt::<TimestampMillisecondType>()
293        .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
294            msg: format!(
295                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
296                ts_column.data_type()
297            ),
298        })?;
299
300    let field_column = recordbatch.column_by_name(greptime_value()).context(
301        error::InvalidPromRemoteReadQueryResultSnafu {
302            msg: "missing greptime_value column in query result",
303        },
304    )?;
305    let field_column = field_column
306        .as_primitive_opt::<Float64Type>()
307        .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
308            msg: format!(
309                "Expect value column of datatype Float64, actual {:?}",
310                field_column.data_type()
311            ),
312        })?;
313
314    // First, collect each row's timeseries id
315    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
316    // Then, group timeseries by it's id.
317    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
318
319    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
320        let timeseries = timeseries_map
321            .entry(timeseries_id)
322            .or_insert_with(|| TimeSeries {
323                labels: timeseries_id.labels.clone(),
324                ..Default::default()
325            });
326
327        if ts_column.is_null(row) || field_column.is_null(row) {
328            continue;
329        }
330
331        let value = field_column.value(row);
332        let timestamp = ts_column.value(row);
333        let sample = Sample { value, timestamp };
334
335        timeseries.samples.push(sample);
336    }
337
338    Ok(timeseries_map.into_values().collect())
339}
340
341pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
342    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
343
344    let mut multi_table_data = MultiTableData::new();
345
346    for series in &request.timeseries {
347        let table_name = &series
348            .labels
349            .iter()
350            .find(|label| {
351                // The metric name is a special label
352                label.name == METRIC_NAME_LABEL
353            })
354            .context(error::InvalidPromRemoteRequestSnafu {
355                msg: "missing '__name__' label in time-series",
356            })?
357            .value;
358
359        // The metric name is a special label,
360        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
361        let num_columns = series.labels.len() + 1;
362
363        let table_data = multi_table_data.get_or_default_table_data(
364            table_name,
365            num_columns,
366            series.samples.len(),
367        );
368
369        // labels
370        let kvs = series.labels.iter().filter_map(|label| {
371            if label.name == METRIC_NAME_LABEL {
372                None
373            } else {
374                Some((label.name.clone(), label.value.clone()))
375            }
376        });
377
378        if series.samples.len() == 1 {
379            let mut one_row = table_data.alloc_one_row();
380
381            row_writer::write_tags(table_data, kvs, &mut one_row)?;
382            // value
383            row_writer::write_f64(
384                table_data,
385                greptime_value(),
386                series.samples[0].value,
387                &mut one_row,
388            )?;
389            // timestamp
390            row_writer::write_ts_to_millis(
391                table_data,
392                greptime_timestamp(),
393                Some(series.samples[0].timestamp),
394                Precision::Millisecond,
395                &mut one_row,
396            )?;
397
398            table_data.add_row(one_row);
399        } else {
400            for Sample { value, timestamp } in &series.samples {
401                let mut one_row = table_data.alloc_one_row();
402
403                // labels
404                let kvs = kvs.clone();
405                row_writer::write_tags(table_data, kvs, &mut one_row)?;
406                // value
407                row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
408                // timestamp
409                row_writer::write_ts_to_millis(
410                    table_data,
411                    greptime_timestamp(),
412                    Some(*timestamp),
413                    Precision::Millisecond,
414                    &mut one_row,
415                )?;
416
417                table_data.add_row(one_row);
418            }
419        }
420    }
421
422    Ok(multi_table_data.into_row_insert_requests())
423}
424
425#[inline]
426pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
427    let mut decoder = Decoder::new();
428    decoder
429        .decompress_vec(buf)
430        .context(error::DecompressSnappyPromRemoteRequestSnafu)
431}
432
433#[inline]
434pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
435    let mut encoder = Encoder::new();
436    encoder
437        .compress_vec(buf)
438        .context(error::CompressPromRemoteRequestSnafu)
439}
440
441#[inline]
442pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
443    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
444}
445
446/// Mock timeseries for test, it is both used in servers and frontend crate
447/// So we present it here
448pub fn mock_timeseries() -> Vec<TimeSeries> {
449    vec![
450        TimeSeries {
451            labels: vec![
452                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
453                new_label("job".to_string(), "spark".to_string()),
454            ],
455            samples: vec![
456                Sample {
457                    value: 1.0f64,
458                    timestamp: 1000,
459                },
460                Sample {
461                    value: 2.0f64,
462                    timestamp: 2000,
463                },
464            ],
465            ..Default::default()
466        },
467        TimeSeries {
468            labels: vec![
469                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
470                new_label("instance".to_string(), "test_host1".to_string()),
471                new_label("idc".to_string(), "z001".to_string()),
472            ],
473            samples: vec![
474                Sample {
475                    value: 3.0f64,
476                    timestamp: 1000,
477                },
478                Sample {
479                    value: 4.0f64,
480                    timestamp: 2000,
481                },
482            ],
483            ..Default::default()
484        },
485        TimeSeries {
486            labels: vec![
487                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
488                new_label("idc".to_string(), "z002".to_string()),
489                new_label("app".to_string(), "biz".to_string()),
490            ],
491            samples: vec![
492                Sample {
493                    value: 5.0f64,
494                    timestamp: 1000,
495                },
496                Sample {
497                    value: 6.0f64,
498                    timestamp: 2000,
499                },
500                Sample {
501                    value: 7.0f64,
502                    timestamp: 3000,
503                },
504            ],
505            ..Default::default()
506        },
507    ]
508}
509
510/// Add new labels to the mock timeseries.
511pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
512    let ts_demo_metrics = TimeSeries {
513        labels: vec![
514            new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
515            new_label("idc".to_string(), "idc3".to_string()),
516            new_label("new_label1".to_string(), "foo".to_string()),
517        ],
518        samples: vec![Sample {
519            value: 42.0,
520            timestamp: 3000,
521        }],
522        ..Default::default()
523    };
524    let ts_multi_labels = TimeSeries {
525        labels: vec![
526            new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
527            new_label("idc".to_string(), "idc4".to_string()),
528            new_label("env".to_string(), "prod".to_string()),
529            new_label("host".to_string(), "host9".to_string()),
530            new_label("new_label2".to_string(), "bar".to_string()),
531        ],
532        samples: vec![Sample {
533            value: 99.0,
534            timestamp: 4000,
535        }],
536        ..Default::default()
537    };
538
539    vec![ts_demo_metrics, ts_multi_labels]
540}
541
542/// Add new labels to the mock timeseries.
543pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
544    let idc3_schema = TimeSeries {
545        labels: vec![
546            new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
547            new_label("__database__".to_string(), "idc3".to_string()),
548            new_label("__physical_table__".to_string(), "f1".to_string()),
549        ],
550        samples: vec![Sample {
551            value: 42.0,
552            timestamp: 3000,
553        }],
554        ..Default::default()
555    };
556    let idc4_schema = TimeSeries {
557        labels: vec![
558            new_label(
559                METRIC_NAME_LABEL.to_string(),
560                "idc4_local_table".to_string(),
561            ),
562            new_label("__database__".to_string(), "idc4".to_string()),
563            new_label("__physical_table__".to_string(), "f2".to_string()),
564        ],
565        samples: vec![Sample {
566            value: 99.0,
567            timestamp: 4000,
568        }],
569        ..Default::default()
570    };
571
572    vec![idc3_schema, idc4_schema]
573}
574
575#[cfg(test)]
576mod tests {
577    use std::sync::Arc;
578
579    use api::prom_store::remote::LabelMatcher;
580    use api::v1::{ColumnDataType, Row, SemanticType};
581    use datafusion::prelude::SessionContext;
582    use datatypes::data_type::ConcreteDataType;
583    use datatypes::schema::{ColumnSchema, Schema};
584    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
585    use table::table::adapter::DfTableProviderAdapter;
586    use table::test_util::MemTable;
587
588    use super::*;
589
590    const EQ_TYPE: i32 = MatcherType::Eq as i32;
591    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
592    const RE_TYPE: i32 = MatcherType::Re as i32;
593
594    #[test]
595    fn test_table_name() {
596        let q = Query {
597            start_timestamp_ms: 1000,
598            end_timestamp_ms: 2000,
599            matchers: vec![],
600            ..Default::default()
601        };
602        let err = table_name(&q).unwrap_err();
603        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
604
605        let q = Query {
606            start_timestamp_ms: 1000,
607            end_timestamp_ms: 2000,
608            matchers: vec![LabelMatcher {
609                name: METRIC_NAME_LABEL.to_string(),
610                value: "test".to_string(),
611                r#type: EQ_TYPE,
612            }],
613            ..Default::default()
614        };
615        assert_eq!("test", table_name(&q).unwrap());
616    }
617
618    #[test]
619    fn test_query_to_plan() {
620        let q = Query {
621            start_timestamp_ms: 1000,
622            end_timestamp_ms: 2000,
623            matchers: vec![LabelMatcher {
624                name: METRIC_NAME_LABEL.to_string(),
625                value: "test".to_string(),
626                r#type: EQ_TYPE,
627            }],
628            ..Default::default()
629        };
630
631        let schema = Arc::new(Schema::new(vec![
632            ColumnSchema::new(
633                greptime_timestamp(),
634                ConcreteDataType::timestamp_millisecond_datatype(),
635                true,
636            ),
637            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
638            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
639            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
640        ]));
641        let recordbatch = RecordBatch::new(
642            schema,
643            vec![
644                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
645                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
646                Arc::new(StringVector::from(vec!["host1"])) as _,
647                Arc::new(StringVector::from(vec!["job"])) as _,
648            ],
649        )
650        .unwrap();
651
652        let ctx = SessionContext::new();
653        let table = MemTable::table("test", recordbatch);
654        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
655
656        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
657        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
658        let display_string = format!("{}", plan.display_indent());
659
660        let ts_col = greptime_timestamp();
661        let expected = format!(
662            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n  TableScan: ?table?",
663            ts_col, ts_col
664        );
665        assert_eq!(expected, display_string);
666
667        let q = Query {
668            start_timestamp_ms: 1000,
669            end_timestamp_ms: 2000,
670            matchers: vec![
671                LabelMatcher {
672                    name: METRIC_NAME_LABEL.to_string(),
673                    value: "test".to_string(),
674                    r#type: EQ_TYPE,
675                },
676                LabelMatcher {
677                    name: "job".to_string(),
678                    value: "*prom*".to_string(),
679                    r#type: RE_TYPE,
680                },
681                LabelMatcher {
682                    name: "instance".to_string(),
683                    value: "localhost".to_string(),
684                    r#type: NEQ_TYPE,
685                },
686            ],
687            ..Default::default()
688        };
689
690        let dataframe = ctx.read_table(table_provider).unwrap();
691        let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
692        let display_string = format!("{}", plan.display_indent());
693
694        let ts_col = greptime_timestamp();
695        let expected = format!(
696            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?",
697            ts_col, ts_col
698        );
699        assert_eq!(expected, display_string);
700    }
701
702    fn column_schemas_with(
703        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
704    ) -> Vec<api::v1::ColumnSchema> {
705        kts_iter.push((
706            greptime_value(),
707            ColumnDataType::Float64,
708            SemanticType::Field,
709        ));
710        kts_iter.push((
711            greptime_timestamp(),
712            ColumnDataType::TimestampMillisecond,
713            SemanticType::Timestamp,
714        ));
715
716        kts_iter
717            .into_iter()
718            .map(|(k, t, s)| api::v1::ColumnSchema {
719                column_name: k.to_string(),
720                datatype: t as i32,
721                semantic_type: s as i32,
722                ..Default::default()
723            })
724            .collect()
725    }
726
727    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
728        Row {
729            values: vec![
730                api::v1::Value {
731                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
732                },
733                api::v1::Value {
734                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
735                },
736                api::v1::Value {
737                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
738                        timestamp,
739                    )),
740                },
741            ],
742        }
743    }
744
745    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
746        Row {
747            values: vec![
748                api::v1::Value {
749                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
750                },
751                api::v1::Value {
752                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
753                },
754                api::v1::Value {
755                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
756                },
757                api::v1::Value {
758                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
759                        timestamp,
760                    )),
761                },
762            ],
763        }
764    }
765
766    #[test]
767    fn test_write_request_to_row_insert_exprs() {
768        let write_request = WriteRequest {
769            timeseries: mock_timeseries(),
770            ..Default::default()
771        };
772
773        let mut exprs = to_grpc_row_insert_requests(&write_request)
774            .unwrap()
775            .0
776            .inserts;
777        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
778        assert_eq!(3, exprs.len());
779        assert_eq!("metric1", exprs[0].table_name);
780        assert_eq!("metric2", exprs[1].table_name);
781        assert_eq!("metric3", exprs[2].table_name);
782
783        let rows = exprs[0].rows.as_ref().unwrap();
784        let schema = &rows.schema;
785        let rows = &rows.rows;
786        assert_eq!(2, rows.len());
787        assert_eq!(3, schema.len());
788        assert_eq!(
789            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
790            *schema
791        );
792        assert_eq!(
793            &vec![
794                make_row_with_label("spark", 1.0, 1000),
795                make_row_with_label("spark", 2.0, 2000),
796            ],
797            rows
798        );
799
800        let rows = exprs[1].rows.as_ref().unwrap();
801        let schema = &rows.schema;
802        let rows = &rows.rows;
803        assert_eq!(2, rows.len());
804        assert_eq!(4, schema.len());
805        assert_eq!(
806            column_schemas_with(vec![
807                ("instance", ColumnDataType::String, SemanticType::Tag),
808                ("idc", ColumnDataType::String, SemanticType::Tag)
809            ]),
810            *schema
811        );
812        assert_eq!(
813            &vec![
814                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
815                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
816            ],
817            rows
818        );
819
820        let rows = exprs[2].rows.as_ref().unwrap();
821        let schema = &rows.schema;
822        let rows = &rows.rows;
823        assert_eq!(3, rows.len());
824        assert_eq!(4, schema.len());
825        assert_eq!(
826            column_schemas_with(vec![
827                ("idc", ColumnDataType::String, SemanticType::Tag),
828                ("app", ColumnDataType::String, SemanticType::Tag)
829            ]),
830            *schema
831        );
832        assert_eq!(
833            &vec![
834                make_row_with_2_labels("z002", "biz", 5.0, 1000),
835                make_row_with_2_labels("z002", "biz", 6.0, 2000),
836                make_row_with_2_labels("z002", "biz", 7.0, 3000),
837            ],
838            rows
839        );
840    }
841
842    #[test]
843    fn test_recordbatches_to_timeseries() {
844        let schema = Arc::new(Schema::new(vec![
845            ColumnSchema::new(
846                greptime_timestamp(),
847                ConcreteDataType::timestamp_millisecond_datatype(),
848                true,
849            ),
850            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
851            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
852        ]));
853
854        let recordbatches = RecordBatches::try_new(
855            schema.clone(),
856            vec![
857                RecordBatch::new(
858                    schema.clone(),
859                    vec![
860                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
861                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
862                        Arc::new(StringVector::from(vec!["host1"])) as _,
863                    ],
864                )
865                .unwrap(),
866                RecordBatch::new(
867                    schema,
868                    vec![
869                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
870                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
871                        Arc::new(StringVector::from(vec!["host2"])) as _,
872                    ],
873                )
874                .unwrap(),
875            ],
876        )
877        .unwrap();
878
879        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
880        assert_eq!(2, timeseries.len());
881
882        assert_eq!(
883            vec![
884                Label {
885                    name: METRIC_NAME_LABEL.to_string(),
886                    value: "metric1".to_string(),
887                },
888                Label {
889                    name: "instance".to_string(),
890                    value: "host1".to_string(),
891                },
892            ],
893            timeseries[0].labels
894        );
895
896        assert_eq!(
897            timeseries[0].samples,
898            vec![Sample {
899                value: 3.0,
900                timestamp: 1000,
901            }]
902        );
903
904        assert_eq!(
905            vec![
906                Label {
907                    name: METRIC_NAME_LABEL.to_string(),
908                    value: "metric1".to_string(),
909                },
910                Label {
911                    name: "instance".to_string(),
912                    value: "host2".to_string(),
913                },
914            ],
915            timeseries[1].labels
916        );
917        assert_eq!(
918            timeseries[1].samples,
919            vec![Sample {
920                value: 7.0,
921                timestamp: 2000,
922            }]
923        );
924    }
925}