servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use arrow::array::{Array, AsArray};
25use arrow::datatypes::{Float64Type, TimestampMillisecondType};
26use common_grpc::precision::Precision;
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_recordbatch::{RecordBatch, RecordBatches};
29use common_telemetry::{tracing, warn};
30use datafusion::dataframe::DataFrame;
31use datafusion::prelude::{Expr, col, lit, regexp_match};
32use datafusion_common::ScalarValue;
33use datafusion_expr::LogicalPlan;
34use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
35use snafu::{OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
54pub const FIELD_NAME_LABEL: &str = "__field__";
55
56/// Metrics for push gateway protocol
57pub struct Metrics {
58    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61/// Get table name from remote query
62pub fn table_name(q: &Query) -> Result<String> {
63    let label_matches = &q.matchers;
64
65    label_matches
66        .iter()
67        .find_map(|m| {
68            if m.name == METRIC_NAME_LABEL {
69                Some(m.value.clone())
70            } else {
71                None
72            }
73        })
74        .context(error::InvalidPromRemoteRequestSnafu {
75            msg: "missing '__name__' label in timeseries",
76        })
77}
78
79/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
80/// Prioritizes __schema__ over __database__ labels.
81pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82    for query in &request.queries {
83        for matcher in &query.matchers {
84            if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85                return Some(matcher.value.clone());
86            }
87        }
88    }
89
90    // If no __schema__ found, look for __database__
91    for query in &request.queries {
92        for matcher in &query.matchers {
93            if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94                return Some(matcher.value.clone());
95            }
96        }
97    }
98
99    None
100}
101
102/// Create a DataFrame from a remote Query
103#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105    let start_timestamp_ms = q.start_timestamp_ms;
106    let end_timestamp_ms = q.end_timestamp_ms;
107
108    let label_matches = &q.matchers;
109
110    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
111
112    conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
113    conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
114
115    for m in label_matches {
116        let name = &m.name;
117
118        if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
119            continue;
120        }
121
122        let value = &m.value;
123        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
124            error::InvalidPromRemoteRequestSnafu {
125                msg: format!("invalid LabelMatcher type, decode error: {e}",),
126            }
127            .build()
128        })?;
129
130        match m_type {
131            MatcherType::Eq => {
132                conditions.push(col(name).eq(lit(value)));
133            }
134            MatcherType::Neq => {
135                conditions.push(col(name).not_eq(lit(value)));
136            }
137            // Case sensitive regexp match
138            MatcherType::Re => {
139                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
140            }
141            // Case sensitive regexp not match
142            MatcherType::Nre => {
143                conditions.push(regexp_match(col(name), lit(value), None).is_null());
144            }
145        }
146    }
147
148    // Safety: conditions MUST not be empty, reduce always return Some(expr).
149    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
150
151    let dataframe = dataframe
152        .filter(conditions)
153        .context(error::DataFrameSnafu)?;
154
155    Ok(dataframe.into_parts().1)
156}
157
158#[inline]
159fn new_label(name: String, value: String) -> Label {
160    Label { name, value }
161}
162
163fn lit_timestamp_millisecond(ts: i64) -> Expr {
164    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
165}
166
167// A timeseries id
168#[derive(Debug)]
169struct TimeSeriesId {
170    labels: Vec<Label>,
171}
172
173/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
174impl PartialEq for TimeSeriesId {
175    fn eq(&self, other: &Self) -> bool {
176        if self.labels.len() != other.labels.len() {
177            return false;
178        }
179
180        self.labels
181            .iter()
182            .zip(other.labels.iter())
183            .all(|(l, r)| l.name == r.name && l.value == r.value)
184    }
185}
186impl Eq for TimeSeriesId {}
187
188impl Hash for TimeSeriesId {
189    fn hash<H: Hasher>(&self, state: &mut H) {
190        for label in &self.labels {
191            label.name.hash(state);
192            label.value.hash(state);
193        }
194    }
195}
196
197/// For Sorting timeseries
198impl Ord for TimeSeriesId {
199    fn cmp(&self, other: &Self) -> Ordering {
200        let ordering = self.labels.len().cmp(&other.labels.len());
201        if ordering != Ordering::Equal {
202            return ordering;
203        }
204
205        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
206            let ordering = l.name.cmp(&r.name);
207
208            if ordering != Ordering::Equal {
209                return ordering;
210            }
211
212            let ordering = l.value.cmp(&r.value);
213
214            if ordering != Ordering::Equal {
215                return ordering;
216            }
217        }
218        Ordering::Equal
219    }
220}
221
222impl PartialOrd for TimeSeriesId {
223    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
224        Some(self.cmp(other))
225    }
226}
227
228/// Collect each row's timeseries id
229/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
230fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
231    let row_count = recordbatch.num_rows();
232    let mut timeseries_ids = Vec::with_capacity(row_count);
233
234    let column_names = recordbatch
235        .schema
236        .column_schemas()
237        .iter()
238        .map(|column_schema| &column_schema.name);
239    let columns = column_names
240        .enumerate()
241        .filter(|(_, column_name)| {
242            *column_name != greptime_timestamp() && *column_name != greptime_value()
243        })
244        .map(|(i, column_name)| {
245            (
246                column_name,
247                recordbatch.iter_column_as_string(i).collect::<Vec<_>>(),
248            )
249        })
250        .collect::<Vec<_>>();
251
252    for row in 0..row_count {
253        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
254        labels.push(new_label(
255            METRIC_NAME_LABEL.to_string(),
256            table_name.to_string(),
257        ));
258
259        for (column_name, column_values) in columns.iter() {
260            if let Some(value) = &column_values[row] {
261                labels.push(new_label((*column_name).clone(), value.clone()));
262            }
263        }
264        timeseries_ids.push(TimeSeriesId { labels });
265    }
266    timeseries_ids
267}
268
269pub fn recordbatches_to_timeseries(
270    table_name: &str,
271    recordbatches: RecordBatches,
272) -> Result<Vec<TimeSeries>> {
273    Ok(recordbatches
274        .take()
275        .into_iter()
276        .map(|x| recordbatch_to_timeseries(table_name, x))
277        .collect::<Result<Vec<_>>>()?
278        .into_iter()
279        .flatten()
280        .collect())
281}
282
283fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
284    let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
285        error::InvalidPromRemoteReadQueryResultSnafu {
286            msg: "missing greptime_timestamp column in query result",
287        },
288    )?;
289    let ts_column = ts_column
290        .as_primitive_opt::<TimestampMillisecondType>()
291        .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
292            msg: format!(
293                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
294                ts_column.data_type()
295            ),
296        })?;
297
298    let field_column = recordbatch.column_by_name(greptime_value()).context(
299        error::InvalidPromRemoteReadQueryResultSnafu {
300            msg: "missing greptime_value column in query result",
301        },
302    )?;
303    let field_column = field_column
304        .as_primitive_opt::<Float64Type>()
305        .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
306            msg: format!(
307                "Expect value column of datatype Float64, actual {:?}",
308                field_column.data_type()
309            ),
310        })?;
311
312    // First, collect each row's timeseries id
313    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
314    // Then, group timeseries by it's id.
315    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
316
317    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
318        let timeseries = timeseries_map
319            .entry(timeseries_id)
320            .or_insert_with(|| TimeSeries {
321                labels: timeseries_id.labels.clone(),
322                ..Default::default()
323            });
324
325        if ts_column.is_null(row) || field_column.is_null(row) {
326            continue;
327        }
328
329        let value = field_column.value(row);
330        let timestamp = ts_column.value(row);
331        let sample = Sample { value, timestamp };
332
333        timeseries.samples.push(sample);
334    }
335
336    Ok(timeseries_map.into_values().collect())
337}
338
339pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
340    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
341
342    let mut multi_table_data = MultiTableData::new();
343
344    for series in &request.timeseries {
345        let table_name = &series
346            .labels
347            .iter()
348            .find(|label| {
349                // The metric name is a special label
350                label.name == METRIC_NAME_LABEL
351            })
352            .context(error::InvalidPromRemoteRequestSnafu {
353                msg: "missing '__name__' label in time-series",
354            })?
355            .value;
356
357        // The metric name is a special label,
358        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
359        let num_columns = series.labels.len() + 1;
360
361        let table_data = multi_table_data.get_or_default_table_data(
362            table_name,
363            num_columns,
364            series.samples.len(),
365        );
366
367        // labels
368        let kvs = series.labels.iter().filter_map(|label| {
369            if label.name == METRIC_NAME_LABEL {
370                None
371            } else {
372                Some((label.name.clone(), label.value.clone()))
373            }
374        });
375
376        if series.samples.len() == 1 {
377            let mut one_row = table_data.alloc_one_row();
378
379            row_writer::write_tags(table_data, kvs, &mut one_row)?;
380            // value
381            row_writer::write_f64(
382                table_data,
383                greptime_value(),
384                series.samples[0].value,
385                &mut one_row,
386            )?;
387            // timestamp
388            row_writer::write_ts_to_millis(
389                table_data,
390                greptime_timestamp(),
391                Some(series.samples[0].timestamp),
392                Precision::Millisecond,
393                &mut one_row,
394            )?;
395
396            table_data.add_row(one_row);
397        } else {
398            for Sample { value, timestamp } in &series.samples {
399                let mut one_row = table_data.alloc_one_row();
400
401                // labels
402                let kvs = kvs.clone();
403                row_writer::write_tags(table_data, kvs, &mut one_row)?;
404                // value
405                row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
406                // timestamp
407                row_writer::write_ts_to_millis(
408                    table_data,
409                    greptime_timestamp(),
410                    Some(*timestamp),
411                    Precision::Millisecond,
412                    &mut one_row,
413                )?;
414
415                table_data.add_row(one_row);
416            }
417        }
418
419        if !series.histograms.is_empty() {
420            warn!("Native histograms are not supported yet, data ignored");
421        }
422    }
423
424    Ok(multi_table_data.into_row_insert_requests())
425}
426
427#[inline]
428pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
429    let mut decoder = Decoder::new();
430    decoder
431        .decompress_vec(buf)
432        .context(error::DecompressSnappyPromRemoteRequestSnafu)
433}
434
435#[inline]
436pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
437    let mut encoder = Encoder::new();
438    encoder
439        .compress_vec(buf)
440        .context(error::CompressPromRemoteRequestSnafu)
441}
442
443#[inline]
444pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
445    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
446}
447
448/// Mock timeseries for test, it is both used in servers and frontend crate
449/// So we present it here
450pub fn mock_timeseries() -> Vec<TimeSeries> {
451    vec![
452        TimeSeries {
453            labels: vec![
454                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
455                new_label("job".to_string(), "spark".to_string()),
456            ],
457            samples: vec![
458                Sample {
459                    value: 1.0f64,
460                    timestamp: 1000,
461                },
462                Sample {
463                    value: 2.0f64,
464                    timestamp: 2000,
465                },
466            ],
467            ..Default::default()
468        },
469        TimeSeries {
470            labels: vec![
471                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
472                new_label("instance".to_string(), "test_host1".to_string()),
473                new_label("idc".to_string(), "z001".to_string()),
474            ],
475            samples: vec![
476                Sample {
477                    value: 3.0f64,
478                    timestamp: 1000,
479                },
480                Sample {
481                    value: 4.0f64,
482                    timestamp: 2000,
483                },
484            ],
485            ..Default::default()
486        },
487        TimeSeries {
488            labels: vec![
489                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
490                new_label("idc".to_string(), "z002".to_string()),
491                new_label("app".to_string(), "biz".to_string()),
492            ],
493            samples: vec![
494                Sample {
495                    value: 5.0f64,
496                    timestamp: 1000,
497                },
498                Sample {
499                    value: 6.0f64,
500                    timestamp: 2000,
501                },
502                Sample {
503                    value: 7.0f64,
504                    timestamp: 3000,
505                },
506            ],
507            ..Default::default()
508        },
509    ]
510}
511
512/// Add new labels to the mock timeseries.
513pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
514    let ts_demo_metrics = TimeSeries {
515        labels: vec![
516            new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
517            new_label("idc".to_string(), "idc3".to_string()),
518            new_label("new_label1".to_string(), "foo".to_string()),
519        ],
520        samples: vec![Sample {
521            value: 42.0,
522            timestamp: 3000,
523        }],
524        ..Default::default()
525    };
526    let ts_multi_labels = TimeSeries {
527        labels: vec![
528            new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
529            new_label("idc".to_string(), "idc4".to_string()),
530            new_label("env".to_string(), "prod".to_string()),
531            new_label("host".to_string(), "host9".to_string()),
532            new_label("new_label2".to_string(), "bar".to_string()),
533        ],
534        samples: vec![Sample {
535            value: 99.0,
536            timestamp: 4000,
537        }],
538        ..Default::default()
539    };
540
541    vec![ts_demo_metrics, ts_multi_labels]
542}
543
544/// Add new labels to the mock timeseries.
545pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
546    let idc3_schema = TimeSeries {
547        labels: vec![
548            new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
549            new_label("__database__".to_string(), "idc3".to_string()),
550            new_label("__physical_table__".to_string(), "f1".to_string()),
551        ],
552        samples: vec![Sample {
553            value: 42.0,
554            timestamp: 3000,
555        }],
556        ..Default::default()
557    };
558    let idc4_schema = TimeSeries {
559        labels: vec![
560            new_label(
561                METRIC_NAME_LABEL.to_string(),
562                "idc4_local_table".to_string(),
563            ),
564            new_label("__database__".to_string(), "idc4".to_string()),
565            new_label("__physical_table__".to_string(), "f2".to_string()),
566        ],
567        samples: vec![Sample {
568            value: 99.0,
569            timestamp: 4000,
570        }],
571        ..Default::default()
572    };
573
574    vec![idc3_schema, idc4_schema]
575}
576
577#[cfg(test)]
578mod tests {
579    use std::sync::Arc;
580
581    use api::prom_store::remote::LabelMatcher;
582    use api::v1::{ColumnDataType, Row, SemanticType};
583    use datafusion::prelude::SessionContext;
584    use datatypes::data_type::ConcreteDataType;
585    use datatypes::schema::{ColumnSchema, Schema};
586    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
587    use table::table::adapter::DfTableProviderAdapter;
588    use table::test_util::MemTable;
589
590    use super::*;
591
592    const EQ_TYPE: i32 = MatcherType::Eq as i32;
593    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
594    const RE_TYPE: i32 = MatcherType::Re as i32;
595
596    #[test]
597    fn test_table_name() {
598        let q = Query {
599            start_timestamp_ms: 1000,
600            end_timestamp_ms: 2000,
601            matchers: vec![],
602            ..Default::default()
603        };
604        let err = table_name(&q).unwrap_err();
605        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
606
607        let q = Query {
608            start_timestamp_ms: 1000,
609            end_timestamp_ms: 2000,
610            matchers: vec![LabelMatcher {
611                name: METRIC_NAME_LABEL.to_string(),
612                value: "test".to_string(),
613                r#type: EQ_TYPE,
614            }],
615            ..Default::default()
616        };
617        assert_eq!("test", table_name(&q).unwrap());
618    }
619
620    #[test]
621    fn test_query_to_plan() {
622        let q = Query {
623            start_timestamp_ms: 1000,
624            end_timestamp_ms: 2000,
625            matchers: vec![LabelMatcher {
626                name: METRIC_NAME_LABEL.to_string(),
627                value: "test".to_string(),
628                r#type: EQ_TYPE,
629            }],
630            ..Default::default()
631        };
632
633        let schema = Arc::new(Schema::new(vec![
634            ColumnSchema::new(
635                greptime_timestamp(),
636                ConcreteDataType::timestamp_millisecond_datatype(),
637                true,
638            ),
639            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
640            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
641            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
642        ]));
643        let recordbatch = RecordBatch::new(
644            schema,
645            vec![
646                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
647                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
648                Arc::new(StringVector::from(vec!["host1"])) as _,
649                Arc::new(StringVector::from(vec!["job"])) as _,
650            ],
651        )
652        .unwrap();
653
654        let ctx = SessionContext::new();
655        let table = MemTable::table("test", recordbatch);
656        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
657
658        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
659        let plan = query_to_plan(dataframe, &q).unwrap();
660        let display_string = format!("{}", plan.display_indent());
661
662        let ts_col = greptime_timestamp();
663        let expected = format!(
664            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n  TableScan: ?table?",
665            ts_col, ts_col
666        );
667        assert_eq!(expected, display_string);
668
669        let q = Query {
670            start_timestamp_ms: 1000,
671            end_timestamp_ms: 2000,
672            matchers: vec![
673                LabelMatcher {
674                    name: METRIC_NAME_LABEL.to_string(),
675                    value: "test".to_string(),
676                    r#type: EQ_TYPE,
677                },
678                LabelMatcher {
679                    name: "job".to_string(),
680                    value: "*prom*".to_string(),
681                    r#type: RE_TYPE,
682                },
683                LabelMatcher {
684                    name: "instance".to_string(),
685                    value: "localhost".to_string(),
686                    r#type: NEQ_TYPE,
687                },
688            ],
689            ..Default::default()
690        };
691
692        let dataframe = ctx.read_table(table_provider).unwrap();
693        let plan = query_to_plan(dataframe, &q).unwrap();
694        let display_string = format!("{}", plan.display_indent());
695
696        let ts_col = greptime_timestamp();
697        let expected = format!(
698            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?",
699            ts_col, ts_col
700        );
701        assert_eq!(expected, display_string);
702    }
703
704    fn column_schemas_with(
705        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
706    ) -> Vec<api::v1::ColumnSchema> {
707        kts_iter.push((
708            greptime_value(),
709            ColumnDataType::Float64,
710            SemanticType::Field,
711        ));
712        kts_iter.push((
713            greptime_timestamp(),
714            ColumnDataType::TimestampMillisecond,
715            SemanticType::Timestamp,
716        ));
717
718        kts_iter
719            .into_iter()
720            .map(|(k, t, s)| api::v1::ColumnSchema {
721                column_name: k.to_string(),
722                datatype: t as i32,
723                semantic_type: s as i32,
724                ..Default::default()
725            })
726            .collect()
727    }
728
729    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
730        Row {
731            values: vec![
732                api::v1::Value {
733                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
734                },
735                api::v1::Value {
736                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
737                },
738                api::v1::Value {
739                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
740                        timestamp,
741                    )),
742                },
743            ],
744        }
745    }
746
747    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
748        Row {
749            values: vec![
750                api::v1::Value {
751                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
752                },
753                api::v1::Value {
754                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
755                },
756                api::v1::Value {
757                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
758                },
759                api::v1::Value {
760                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
761                        timestamp,
762                    )),
763                },
764            ],
765        }
766    }
767
768    #[test]
769    fn test_write_request_to_row_insert_exprs() {
770        let write_request = WriteRequest {
771            timeseries: mock_timeseries(),
772            ..Default::default()
773        };
774
775        let mut exprs = to_grpc_row_insert_requests(&write_request)
776            .unwrap()
777            .0
778            .inserts;
779        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
780        assert_eq!(3, exprs.len());
781        assert_eq!("metric1", exprs[0].table_name);
782        assert_eq!("metric2", exprs[1].table_name);
783        assert_eq!("metric3", exprs[2].table_name);
784
785        let rows = exprs[0].rows.as_ref().unwrap();
786        let schema = &rows.schema;
787        let rows = &rows.rows;
788        assert_eq!(2, rows.len());
789        assert_eq!(3, schema.len());
790        assert_eq!(
791            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
792            *schema
793        );
794        assert_eq!(
795            &vec![
796                make_row_with_label("spark", 1.0, 1000),
797                make_row_with_label("spark", 2.0, 2000),
798            ],
799            rows
800        );
801
802        let rows = exprs[1].rows.as_ref().unwrap();
803        let schema = &rows.schema;
804        let rows = &rows.rows;
805        assert_eq!(2, rows.len());
806        assert_eq!(4, schema.len());
807        assert_eq!(
808            column_schemas_with(vec![
809                ("instance", ColumnDataType::String, SemanticType::Tag),
810                ("idc", ColumnDataType::String, SemanticType::Tag)
811            ]),
812            *schema
813        );
814        assert_eq!(
815            &vec![
816                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
817                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
818            ],
819            rows
820        );
821
822        let rows = exprs[2].rows.as_ref().unwrap();
823        let schema = &rows.schema;
824        let rows = &rows.rows;
825        assert_eq!(3, rows.len());
826        assert_eq!(4, schema.len());
827        assert_eq!(
828            column_schemas_with(vec![
829                ("idc", ColumnDataType::String, SemanticType::Tag),
830                ("app", ColumnDataType::String, SemanticType::Tag)
831            ]),
832            *schema
833        );
834        assert_eq!(
835            &vec![
836                make_row_with_2_labels("z002", "biz", 5.0, 1000),
837                make_row_with_2_labels("z002", "biz", 6.0, 2000),
838                make_row_with_2_labels("z002", "biz", 7.0, 3000),
839            ],
840            rows
841        );
842    }
843
844    #[test]
845    fn test_recordbatches_to_timeseries() {
846        let schema = Arc::new(Schema::new(vec![
847            ColumnSchema::new(
848                greptime_timestamp(),
849                ConcreteDataType::timestamp_millisecond_datatype(),
850                true,
851            ),
852            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
853            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
854        ]));
855
856        let recordbatches = RecordBatches::try_new(
857            schema.clone(),
858            vec![
859                RecordBatch::new(
860                    schema.clone(),
861                    vec![
862                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
863                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
864                        Arc::new(StringVector::from(vec!["host1"])) as _,
865                    ],
866                )
867                .unwrap(),
868                RecordBatch::new(
869                    schema,
870                    vec![
871                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
872                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
873                        Arc::new(StringVector::from(vec!["host2"])) as _,
874                    ],
875                )
876                .unwrap(),
877            ],
878        )
879        .unwrap();
880
881        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
882        assert_eq!(2, timeseries.len());
883
884        assert_eq!(
885            vec![
886                Label {
887                    name: METRIC_NAME_LABEL.to_string(),
888                    value: "metric1".to_string(),
889                },
890                Label {
891                    name: "instance".to_string(),
892                    value: "host1".to_string(),
893                },
894            ],
895            timeseries[0].labels
896        );
897
898        assert_eq!(
899            timeseries[0].samples,
900            vec![Sample {
901                value: 3.0,
902                timestamp: 1000,
903            }]
904        );
905
906        assert_eq!(
907            vec![
908                Label {
909                    name: METRIC_NAME_LABEL.to_string(),
910                    value: "metric1".to_string(),
911                },
912                Label {
913                    name: "instance".to_string(),
914                    value: "host2".to_string(),
915                },
916            ],
917            timeseries[1].labels
918        );
919        assert_eq!(
920            timeseries[1].samples,
921            vec![Sample {
922                value: 7.0,
923                timestamp: 2000,
924            }]
925        );
926    }
927}