servers/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prometheus protocol supportings
16//! handles prometheus remote_write, remote_read logic
17use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use arrow::array::{Array, AsArray};
25use arrow::datatypes::{Float64Type, TimestampMillisecondType};
26use common_grpc::precision::Precision;
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_recordbatch::{RecordBatch, RecordBatches};
29use common_telemetry::{tracing, warn};
30use datafusion::dataframe::DataFrame;
31use datafusion::prelude::{Expr, col, lit, regexp_match};
32use datafusion_common::ScalarValue;
33use datafusion_expr::LogicalPlan;
34use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
35use snafu::{OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44/// special label for selecting database name on remote write
45pub const DATABASE_LABEL: &str = "x_greptime_database";
46pub const DATABASE_LABEL_BYTES: &[u8] = b"x_greptime_database";
47pub const DATABASE_LABEL_ALT: &str = "__database__";
48pub const DATABASE_LABEL_ALT_BYTES: &[u8] = b"__database__";
49
50/// deprecated, use DATABASE_LABEL instead
51#[deprecated(note = "use DATABASE_LABEL instead")]
52pub const SCHEMA_LABEL: &str = "__schema__";
53#[deprecated(note = "use DATABASE_LABEL_BYTES instead")]
54pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
55
56/// special label for selecting physical table name on remote write
57pub const PHYSICAL_TABLE_LABEL: &str = "x_greptime_physical_table";
58pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"x_greptime_physical_table";
59pub const PHYSICAL_TABLE_LABEL_ALT: &str = "__physical_table__";
60pub const PHYSICAL_TABLE_LABEL_ALT_BYTES: &[u8] = b"__physical_table__";
61
62/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
63pub const FIELD_NAME_LABEL: &str = "__field__";
64
65/// Check if given label is a special label for remote write
66#[allow(deprecated)]
67pub fn is_remote_write_special_label(label: &str) -> bool {
68    label == DATABASE_LABEL
69        || label == DATABASE_LABEL_ALT
70        || label == PHYSICAL_TABLE_LABEL
71        || label == PHYSICAL_TABLE_LABEL_ALT
72        || label == SCHEMA_LABEL
73}
74
75#[allow(deprecated)]
76pub fn is_remote_read_special_label(label: &str) -> bool {
77    label == METRIC_NAME_LABEL
78        || label == DATABASE_LABEL
79        || label == DATABASE_LABEL_ALT
80        || label == SCHEMA_LABEL
81}
82
83/// Check if given label is a database selection label
84#[allow(deprecated)]
85pub fn is_database_selection_label(label: &str) -> bool {
86    label == DATABASE_LABEL || label == DATABASE_LABEL_ALT || label == SCHEMA_LABEL
87}
88
89/// Check if given label is a physical table selection label
90pub fn is_physical_table_selection_label(label: &str) -> bool {
91    label == PHYSICAL_TABLE_LABEL || label == PHYSICAL_TABLE_LABEL_ALT
92}
93
94/// Metrics for push gateway protocol
95pub struct Metrics {
96    pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
97}
98
99/// Get table name from remote query
100pub fn table_name(q: &Query) -> Result<String> {
101    let label_matches = &q.matchers;
102
103    label_matches
104        .iter()
105        .find_map(|m| {
106            if m.name == METRIC_NAME_LABEL {
107                Some(m.value.clone())
108            } else {
109                None
110            }
111        })
112        .context(error::InvalidPromRemoteRequestSnafu {
113            msg: "missing '__name__' label in timeseries",
114        })
115}
116
117/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
118pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
119    for query in &request.queries {
120        for matcher in &query.matchers {
121            if is_database_selection_label(&matcher.name)
122                && matcher.r#type == MatcherType::Eq as i32
123            {
124                return Some(matcher.value.clone());
125            }
126        }
127    }
128
129    None
130}
131
132/// Create a DataFrame from a remote Query
133#[tracing::instrument(skip_all)]
134pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
135    let start_timestamp_ms = q.start_timestamp_ms;
136    let end_timestamp_ms = q.end_timestamp_ms;
137
138    let label_matches = &q.matchers;
139
140    let mut conditions = Vec::with_capacity(label_matches.len() + 1);
141
142    conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
143    conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
144
145    for m in label_matches {
146        let name = &m.name;
147
148        if is_remote_read_special_label(name) {
149            continue;
150        }
151
152        let value = &m.value;
153        let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
154            error::InvalidPromRemoteRequestSnafu {
155                msg: format!("invalid LabelMatcher type, decode error: {e}",),
156            }
157            .build()
158        })?;
159
160        match m_type {
161            MatcherType::Eq => {
162                conditions.push(col(name).eq(lit(value)));
163            }
164            MatcherType::Neq => {
165                conditions.push(col(name).not_eq(lit(value)));
166            }
167            // Case sensitive regexp match
168            MatcherType::Re => {
169                conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
170            }
171            // Case sensitive regexp not match
172            MatcherType::Nre => {
173                conditions.push(regexp_match(col(name), lit(value), None).is_null());
174            }
175        }
176    }
177
178    // Safety: conditions MUST not be empty, reduce always return Some(expr).
179    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
180
181    let dataframe = dataframe
182        .filter(conditions)
183        .context(error::DataFrameSnafu)?;
184
185    Ok(dataframe.into_parts().1)
186}
187
188#[inline]
189fn new_label(name: String, value: String) -> Label {
190    Label { name, value }
191}
192
193fn lit_timestamp_millisecond(ts: i64) -> Expr {
194    Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
195}
196
197// A timeseries id
198#[derive(Debug)]
199struct TimeSeriesId {
200    labels: Vec<Label>,
201}
202
203/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
204impl PartialEq for TimeSeriesId {
205    fn eq(&self, other: &Self) -> bool {
206        if self.labels.len() != other.labels.len() {
207            return false;
208        }
209
210        self.labels
211            .iter()
212            .zip(other.labels.iter())
213            .all(|(l, r)| l.name == r.name && l.value == r.value)
214    }
215}
216impl Eq for TimeSeriesId {}
217
218impl Hash for TimeSeriesId {
219    fn hash<H: Hasher>(&self, state: &mut H) {
220        for label in &self.labels {
221            label.name.hash(state);
222            label.value.hash(state);
223        }
224    }
225}
226
227/// For Sorting timeseries
228impl Ord for TimeSeriesId {
229    fn cmp(&self, other: &Self) -> Ordering {
230        let ordering = self.labels.len().cmp(&other.labels.len());
231        if ordering != Ordering::Equal {
232            return ordering;
233        }
234
235        for (l, r) in self.labels.iter().zip(other.labels.iter()) {
236            let ordering = l.name.cmp(&r.name);
237
238            if ordering != Ordering::Equal {
239                return ordering;
240            }
241
242            let ordering = l.value.cmp(&r.value);
243
244            if ordering != Ordering::Equal {
245                return ordering;
246            }
247        }
248        Ordering::Equal
249    }
250}
251
252impl PartialOrd for TimeSeriesId {
253    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
254        Some(self.cmp(other))
255    }
256}
257
258/// Collect each row's timeseries id
259/// This processing is ugly, hope <https://github.com/GreptimeTeam/greptimedb/issues/336> making some progress in future.
260fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
261    let row_count = recordbatch.num_rows();
262    let mut timeseries_ids = Vec::with_capacity(row_count);
263
264    let column_names = recordbatch
265        .schema
266        .column_schemas()
267        .iter()
268        .map(|column_schema| &column_schema.name);
269    let columns = column_names
270        .enumerate()
271        .filter(|(_, column_name)| {
272            *column_name != greptime_timestamp() && *column_name != greptime_value()
273        })
274        .map(|(i, column_name)| {
275            (
276                column_name,
277                recordbatch.iter_column_as_string(i).collect::<Vec<_>>(),
278            )
279        })
280        .collect::<Vec<_>>();
281
282    for row in 0..row_count {
283        let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
284        labels.push(new_label(
285            METRIC_NAME_LABEL.to_string(),
286            table_name.to_string(),
287        ));
288
289        for (column_name, column_values) in columns.iter() {
290            if let Some(value) = &column_values[row] {
291                labels.push(new_label((*column_name).clone(), value.clone()));
292            }
293        }
294        timeseries_ids.push(TimeSeriesId { labels });
295    }
296    timeseries_ids
297}
298
299pub fn recordbatches_to_timeseries(
300    table_name: &str,
301    recordbatches: RecordBatches,
302) -> Result<Vec<TimeSeries>> {
303    Ok(recordbatches
304        .take()
305        .into_iter()
306        .map(|x| recordbatch_to_timeseries(table_name, x))
307        .collect::<Result<Vec<_>>>()?
308        .into_iter()
309        .flatten()
310        .collect())
311}
312
313fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
314    let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
315        error::InvalidPromRemoteReadQueryResultSnafu {
316            msg: "missing greptime_timestamp column in query result",
317        },
318    )?;
319    let ts_column = ts_column
320        .as_primitive_opt::<TimestampMillisecondType>()
321        .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
322            msg: format!(
323                "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
324                ts_column.data_type()
325            ),
326        })?;
327
328    let field_column = recordbatch.column_by_name(greptime_value()).context(
329        error::InvalidPromRemoteReadQueryResultSnafu {
330            msg: "missing greptime_value column in query result",
331        },
332    )?;
333    let field_column = field_column
334        .as_primitive_opt::<Float64Type>()
335        .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
336            msg: format!(
337                "Expect value column of datatype Float64, actual {:?}",
338                field_column.data_type()
339            ),
340        })?;
341
342    // First, collect each row's timeseries id
343    let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
344    // Then, group timeseries by it's id.
345    let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
346
347    for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
348        let timeseries = timeseries_map
349            .entry(timeseries_id)
350            .or_insert_with(|| TimeSeries {
351                labels: timeseries_id.labels.clone(),
352                ..Default::default()
353            });
354
355        if ts_column.is_null(row) || field_column.is_null(row) {
356            continue;
357        }
358
359        let value = field_column.value(row);
360        let timestamp = ts_column.value(row);
361        let sample = Sample { value, timestamp };
362
363        timeseries.samples.push(sample);
364    }
365
366    Ok(timeseries_map.into_values().collect())
367}
368
369pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
370    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
371
372    let mut multi_table_data = MultiTableData::new();
373
374    for series in &request.timeseries {
375        let table_name = &series
376            .labels
377            .iter()
378            .find(|label| {
379                // The metric name is a special label
380                label.name == METRIC_NAME_LABEL
381            })
382            .context(error::InvalidPromRemoteRequestSnafu {
383                msg: "missing '__name__' label in time-series",
384            })?
385            .value;
386
387        // The metric name is a special label,
388        // num_columns = labels.len() - 1 + 1 (value) + 1 (timestamp)
389        let num_columns = series.labels.len() + 1;
390
391        let table_data = multi_table_data.get_or_default_table_data(
392            table_name,
393            num_columns,
394            series.samples.len(),
395        );
396
397        // labels
398        let kvs = series.labels.iter().filter_map(|label| {
399            if label.name == METRIC_NAME_LABEL {
400                None
401            } else {
402                Some((label.name.clone(), label.value.clone()))
403            }
404        });
405
406        if series.samples.len() == 1 {
407            let mut one_row = table_data.alloc_one_row();
408
409            row_writer::write_tags(table_data, kvs, &mut one_row)?;
410            // value
411            row_writer::write_f64(
412                table_data,
413                greptime_value(),
414                series.samples[0].value,
415                &mut one_row,
416            )?;
417            // timestamp
418            row_writer::write_ts_to_millis(
419                table_data,
420                greptime_timestamp(),
421                Some(series.samples[0].timestamp),
422                Precision::Millisecond,
423                &mut one_row,
424            )?;
425
426            table_data.add_row(one_row);
427        } else {
428            for Sample { value, timestamp } in &series.samples {
429                let mut one_row = table_data.alloc_one_row();
430
431                // labels
432                let kvs = kvs.clone();
433                row_writer::write_tags(table_data, kvs, &mut one_row)?;
434                // value
435                row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
436                // timestamp
437                row_writer::write_ts_to_millis(
438                    table_data,
439                    greptime_timestamp(),
440                    Some(*timestamp),
441                    Precision::Millisecond,
442                    &mut one_row,
443                )?;
444
445                table_data.add_row(one_row);
446            }
447        }
448
449        if !series.histograms.is_empty() {
450            warn!("Native histograms are not supported yet, data ignored");
451        }
452    }
453
454    Ok(multi_table_data.into_row_insert_requests())
455}
456
457#[inline]
458pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
459    let mut decoder = Decoder::new();
460    decoder
461        .decompress_vec(buf)
462        .context(error::DecompressSnappyPromRemoteRequestSnafu)
463}
464
465#[inline]
466pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
467    let mut encoder = Encoder::new();
468    encoder
469        .compress_vec(buf)
470        .context(error::CompressPromRemoteRequestSnafu)
471}
472
473#[inline]
474pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
475    zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
476}
477
478/// Mock timeseries for test, it is both used in servers and frontend crate
479/// So we present it here
480pub fn mock_timeseries() -> Vec<TimeSeries> {
481    vec![
482        TimeSeries {
483            labels: vec![
484                new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
485                new_label("job".to_string(), "spark".to_string()),
486            ],
487            samples: vec![
488                Sample {
489                    value: 1.0f64,
490                    timestamp: 1000,
491                },
492                Sample {
493                    value: 2.0f64,
494                    timestamp: 2000,
495                },
496            ],
497            ..Default::default()
498        },
499        TimeSeries {
500            labels: vec![
501                new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
502                new_label("instance".to_string(), "test_host1".to_string()),
503                new_label("idc".to_string(), "z001".to_string()),
504            ],
505            samples: vec![
506                Sample {
507                    value: 3.0f64,
508                    timestamp: 1000,
509                },
510                Sample {
511                    value: 4.0f64,
512                    timestamp: 2000,
513                },
514            ],
515            ..Default::default()
516        },
517        TimeSeries {
518            labels: vec![
519                new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
520                new_label("idc".to_string(), "z002".to_string()),
521                new_label("app".to_string(), "biz".to_string()),
522            ],
523            samples: vec![
524                Sample {
525                    value: 5.0f64,
526                    timestamp: 1000,
527                },
528                Sample {
529                    value: 6.0f64,
530                    timestamp: 2000,
531                },
532                Sample {
533                    value: 7.0f64,
534                    timestamp: 3000,
535                },
536            ],
537            ..Default::default()
538        },
539    ]
540}
541
542/// Add new labels to the mock timeseries.
543pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
544    let ts_demo_metrics = TimeSeries {
545        labels: vec![
546            new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
547            new_label("idc".to_string(), "idc3".to_string()),
548            new_label("new_label1".to_string(), "foo".to_string()),
549        ],
550        samples: vec![Sample {
551            value: 42.0,
552            timestamp: 3000,
553        }],
554        ..Default::default()
555    };
556    let ts_multi_labels = TimeSeries {
557        labels: vec![
558            new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
559            new_label("idc".to_string(), "idc4".to_string()),
560            new_label("env".to_string(), "prod".to_string()),
561            new_label("host".to_string(), "host9".to_string()),
562            new_label("new_label2".to_string(), "bar".to_string()),
563        ],
564        samples: vec![Sample {
565            value: 99.0,
566            timestamp: 4000,
567        }],
568        ..Default::default()
569    };
570
571    vec![ts_demo_metrics, ts_multi_labels]
572}
573
574/// Add new labels to the mock timeseries.
575pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
576    let idc3_schema = TimeSeries {
577        labels: vec![
578            new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
579            new_label(DATABASE_LABEL.to_string(), "idc3".to_string()),
580            new_label(PHYSICAL_TABLE_LABEL.to_string(), "f1".to_string()),
581        ],
582        samples: vec![Sample {
583            value: 42.0,
584            timestamp: 3000,
585        }],
586        ..Default::default()
587    };
588    let idc4_schema = TimeSeries {
589        labels: vec![
590            new_label(
591                METRIC_NAME_LABEL.to_string(),
592                "idc4_local_table".to_string(),
593            ),
594            new_label(DATABASE_LABEL.to_string(), "idc4".to_string()),
595            new_label(PHYSICAL_TABLE_LABEL.to_string(), "f2".to_string()),
596        ],
597        samples: vec![Sample {
598            value: 99.0,
599            timestamp: 4000,
600        }],
601        ..Default::default()
602    };
603
604    vec![idc3_schema, idc4_schema]
605}
606
607#[cfg(test)]
608mod tests {
609    use std::sync::Arc;
610
611    use api::prom_store::remote::LabelMatcher;
612    use api::v1::{ColumnDataType, Row, SemanticType};
613    use datafusion::prelude::SessionContext;
614    use datatypes::data_type::ConcreteDataType;
615    use datatypes::schema::{ColumnSchema, Schema};
616    use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
617    use table::table::adapter::DfTableProviderAdapter;
618    use table::test_util::MemTable;
619
620    use super::*;
621
622    const EQ_TYPE: i32 = MatcherType::Eq as i32;
623    const NEQ_TYPE: i32 = MatcherType::Neq as i32;
624    const RE_TYPE: i32 = MatcherType::Re as i32;
625
626    #[test]
627    fn test_table_name() {
628        let q = Query {
629            start_timestamp_ms: 1000,
630            end_timestamp_ms: 2000,
631            matchers: vec![],
632            ..Default::default()
633        };
634        let err = table_name(&q).unwrap_err();
635        assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
636
637        let q = Query {
638            start_timestamp_ms: 1000,
639            end_timestamp_ms: 2000,
640            matchers: vec![LabelMatcher {
641                name: METRIC_NAME_LABEL.to_string(),
642                value: "test".to_string(),
643                r#type: EQ_TYPE,
644            }],
645            ..Default::default()
646        };
647        assert_eq!("test", table_name(&q).unwrap());
648    }
649
650    #[test]
651    fn test_query_to_plan() {
652        let q = Query {
653            start_timestamp_ms: 1000,
654            end_timestamp_ms: 2000,
655            matchers: vec![LabelMatcher {
656                name: METRIC_NAME_LABEL.to_string(),
657                value: "test".to_string(),
658                r#type: EQ_TYPE,
659            }],
660            ..Default::default()
661        };
662
663        let schema = Arc::new(Schema::new(vec![
664            ColumnSchema::new(
665                greptime_timestamp(),
666                ConcreteDataType::timestamp_millisecond_datatype(),
667                true,
668            ),
669            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
670            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
671            ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
672        ]));
673        let recordbatch = RecordBatch::new(
674            schema,
675            vec![
676                Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
677                Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
678                Arc::new(StringVector::from(vec!["host1"])) as _,
679                Arc::new(StringVector::from(vec!["job"])) as _,
680            ],
681        )
682        .unwrap();
683
684        let ctx = SessionContext::new();
685        let table = MemTable::table("test", recordbatch);
686        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
687
688        let dataframe = ctx.read_table(table_provider.clone()).unwrap();
689        let plan = query_to_plan(dataframe, &q).unwrap();
690        let display_string = format!("{}", plan.display_indent());
691
692        let ts_col = greptime_timestamp();
693        let expected = format!(
694            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n  TableScan: ?table?",
695            ts_col, ts_col
696        );
697        assert_eq!(expected, display_string);
698
699        let q = Query {
700            start_timestamp_ms: 1000,
701            end_timestamp_ms: 2000,
702            matchers: vec![
703                LabelMatcher {
704                    name: METRIC_NAME_LABEL.to_string(),
705                    value: "test".to_string(),
706                    r#type: EQ_TYPE,
707                },
708                LabelMatcher {
709                    name: "job".to_string(),
710                    value: "*prom*".to_string(),
711                    r#type: RE_TYPE,
712                },
713                LabelMatcher {
714                    name: "instance".to_string(),
715                    value: "localhost".to_string(),
716                    r#type: NEQ_TYPE,
717                },
718            ],
719            ..Default::default()
720        };
721
722        let dataframe = ctx.read_table(table_provider).unwrap();
723        let plan = query_to_plan(dataframe, &q).unwrap();
724        let display_string = format!("{}", plan.display_indent());
725
726        let ts_col = greptime_timestamp();
727        let expected = format!(
728            "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n  TableScan: ?table?",
729            ts_col, ts_col
730        );
731        assert_eq!(expected, display_string);
732    }
733
734    fn column_schemas_with(
735        mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
736    ) -> Vec<api::v1::ColumnSchema> {
737        kts_iter.push((
738            greptime_value(),
739            ColumnDataType::Float64,
740            SemanticType::Field,
741        ));
742        kts_iter.push((
743            greptime_timestamp(),
744            ColumnDataType::TimestampMillisecond,
745            SemanticType::Timestamp,
746        ));
747
748        kts_iter
749            .into_iter()
750            .map(|(k, t, s)| api::v1::ColumnSchema {
751                column_name: k.to_string(),
752                datatype: t as i32,
753                semantic_type: s as i32,
754                ..Default::default()
755            })
756            .collect()
757    }
758
759    fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
760        Row {
761            values: vec![
762                api::v1::Value {
763                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
764                },
765                api::v1::Value {
766                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
767                },
768                api::v1::Value {
769                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
770                        timestamp,
771                    )),
772                },
773            ],
774        }
775    }
776
777    fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
778        Row {
779            values: vec![
780                api::v1::Value {
781                    value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
782                },
783                api::v1::Value {
784                    value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
785                },
786                api::v1::Value {
787                    value_data: Some(api::v1::value::ValueData::F64Value(value)),
788                },
789                api::v1::Value {
790                    value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
791                        timestamp,
792                    )),
793                },
794            ],
795        }
796    }
797
798    #[test]
799    fn test_write_request_to_row_insert_exprs() {
800        let write_request = WriteRequest {
801            timeseries: mock_timeseries(),
802            ..Default::default()
803        };
804
805        let mut exprs = to_grpc_row_insert_requests(&write_request)
806            .unwrap()
807            .0
808            .inserts;
809        exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
810        assert_eq!(3, exprs.len());
811        assert_eq!("metric1", exprs[0].table_name);
812        assert_eq!("metric2", exprs[1].table_name);
813        assert_eq!("metric3", exprs[2].table_name);
814
815        let rows = exprs[0].rows.as_ref().unwrap();
816        let schema = &rows.schema;
817        let rows = &rows.rows;
818        assert_eq!(2, rows.len());
819        assert_eq!(3, schema.len());
820        assert_eq!(
821            column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
822            *schema
823        );
824        assert_eq!(
825            &vec![
826                make_row_with_label("spark", 1.0, 1000),
827                make_row_with_label("spark", 2.0, 2000),
828            ],
829            rows
830        );
831
832        let rows = exprs[1].rows.as_ref().unwrap();
833        let schema = &rows.schema;
834        let rows = &rows.rows;
835        assert_eq!(2, rows.len());
836        assert_eq!(4, schema.len());
837        assert_eq!(
838            column_schemas_with(vec![
839                ("instance", ColumnDataType::String, SemanticType::Tag),
840                ("idc", ColumnDataType::String, SemanticType::Tag)
841            ]),
842            *schema
843        );
844        assert_eq!(
845            &vec![
846                make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
847                make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
848            ],
849            rows
850        );
851
852        let rows = exprs[2].rows.as_ref().unwrap();
853        let schema = &rows.schema;
854        let rows = &rows.rows;
855        assert_eq!(3, rows.len());
856        assert_eq!(4, schema.len());
857        assert_eq!(
858            column_schemas_with(vec![
859                ("idc", ColumnDataType::String, SemanticType::Tag),
860                ("app", ColumnDataType::String, SemanticType::Tag)
861            ]),
862            *schema
863        );
864        assert_eq!(
865            &vec![
866                make_row_with_2_labels("z002", "biz", 5.0, 1000),
867                make_row_with_2_labels("z002", "biz", 6.0, 2000),
868                make_row_with_2_labels("z002", "biz", 7.0, 3000),
869            ],
870            rows
871        );
872    }
873
874    #[test]
875    fn test_recordbatches_to_timeseries() {
876        let schema = Arc::new(Schema::new(vec![
877            ColumnSchema::new(
878                greptime_timestamp(),
879                ConcreteDataType::timestamp_millisecond_datatype(),
880                true,
881            ),
882            ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
883            ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
884        ]));
885
886        let recordbatches = RecordBatches::try_new(
887            schema.clone(),
888            vec![
889                RecordBatch::new(
890                    schema.clone(),
891                    vec![
892                        Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
893                        Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
894                        Arc::new(StringVector::from(vec!["host1"])) as _,
895                    ],
896                )
897                .unwrap(),
898                RecordBatch::new(
899                    schema,
900                    vec![
901                        Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
902                        Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
903                        Arc::new(StringVector::from(vec!["host2"])) as _,
904                    ],
905                )
906                .unwrap(),
907            ],
908        )
909        .unwrap();
910
911        let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
912        assert_eq!(2, timeseries.len());
913
914        assert_eq!(
915            vec![
916                Label {
917                    name: METRIC_NAME_LABEL.to_string(),
918                    value: "metric1".to_string(),
919                },
920                Label {
921                    name: "instance".to_string(),
922                    value: "host1".to_string(),
923                },
924            ],
925            timeseries[0].labels
926        );
927
928        assert_eq!(
929            timeseries[0].samples,
930            vec![Sample {
931                value: 3.0,
932                timestamp: 1000,
933            }]
934        );
935
936        assert_eq!(
937            vec![
938                Label {
939                    name: METRIC_NAME_LABEL.to_string(),
940                    value: "metric1".to_string(),
941                },
942                Label {
943                    name: "instance".to_string(),
944                    value: "host2".to_string(),
945                },
946            ],
947            timeseries[1].labels
948        );
949        assert_eq!(
950            timeseries[1].samples,
951            vec![Sample {
952                value: 7.0,
953                timestamp: 2000,
954            }]
955        );
956    }
957}