1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
48pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
49
50pub const FIELD_NAME_LABEL: &str = "__field__";
52
53pub struct Metrics {
55 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
56}
57
58pub fn table_name(q: &Query) -> Result<String> {
60 let label_matches = &q.matchers;
61
62 label_matches
63 .iter()
64 .find_map(|m| {
65 if m.name == METRIC_NAME_LABEL {
66 Some(m.value.to_string())
67 } else {
68 None
69 }
70 })
71 .context(error::InvalidPromRemoteRequestSnafu {
72 msg: "missing '__name__' label in timeseries",
73 })
74}
75
76#[tracing::instrument(skip_all)]
78pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
79 let DataFrame::DataFusion(dataframe) = dataframe;
80
81 let start_timestamp_ms = q.start_timestamp_ms;
82 let end_timestamp_ms = q.end_timestamp_ms;
83
84 let label_matches = &q.matchers;
85
86 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
87
88 conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
89 conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
90
91 for m in label_matches {
92 let name = &m.name;
93
94 if name == METRIC_NAME_LABEL {
95 continue;
96 }
97
98 let value = &m.value;
99 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
100 error::InvalidPromRemoteRequestSnafu {
101 msg: format!("invalid LabelMatcher type, decode error: {e}",),
102 }
103 .build()
104 })?;
105
106 match m_type {
107 MatcherType::Eq => {
108 conditions.push(col(name).eq(lit(value)));
109 }
110 MatcherType::Neq => {
111 conditions.push(col(name).not_eq(lit(value)));
112 }
113 MatcherType::Re => {
115 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
116 }
117 MatcherType::Nre => {
119 conditions.push(regexp_match(col(name), lit(value), None).is_null());
120 }
121 }
122 }
123
124 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
126
127 let dataframe = dataframe
128 .filter(conditions)
129 .context(error::DataFrameSnafu)?;
130
131 Ok(dataframe.into_parts().1)
132}
133
134#[inline]
135fn new_label(name: String, value: String) -> Label {
136 Label { name, value }
137}
138
139fn lit_timestamp_millisecond(ts: i64) -> Expr {
140 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None))
141}
142
143#[derive(Debug)]
145struct TimeSeriesId {
146 labels: Vec<Label>,
147}
148
149impl PartialEq for TimeSeriesId {
151 fn eq(&self, other: &Self) -> bool {
152 if self.labels.len() != other.labels.len() {
153 return false;
154 }
155
156 self.labels
157 .iter()
158 .zip(other.labels.iter())
159 .all(|(l, r)| l.name == r.name && l.value == r.value)
160 }
161}
162impl Eq for TimeSeriesId {}
163
164impl Hash for TimeSeriesId {
165 fn hash<H: Hasher>(&self, state: &mut H) {
166 for label in &self.labels {
167 label.name.hash(state);
168 label.value.hash(state);
169 }
170 }
171}
172
173impl Ord for TimeSeriesId {
175 fn cmp(&self, other: &Self) -> Ordering {
176 let ordering = self.labels.len().cmp(&other.labels.len());
177 if ordering != Ordering::Equal {
178 return ordering;
179 }
180
181 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
182 let ordering = l.name.cmp(&r.name);
183
184 if ordering != Ordering::Equal {
185 return ordering;
186 }
187
188 let ordering = l.value.cmp(&r.value);
189
190 if ordering != Ordering::Equal {
191 return ordering;
192 }
193 }
194 Ordering::Equal
195 }
196}
197
198impl PartialOrd for TimeSeriesId {
199 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
200 Some(self.cmp(other))
201 }
202}
203
204fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
207 let row_count = recordbatch.num_rows();
208 let mut timeseries_ids = Vec::with_capacity(row_count);
209
210 for row in 0..row_count {
211 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
212 labels.push(new_label(
213 METRIC_NAME_LABEL.to_string(),
214 table_name.to_string(),
215 ));
216
217 for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
218 if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
219 continue;
220 }
221
222 let column = &recordbatch.columns()[i];
223 if column.is_null(row) {
225 continue;
226 }
227
228 let value = column.get(row).to_string();
229 labels.push(new_label(column_schema.name.clone(), value));
230 }
231 timeseries_ids.push(TimeSeriesId { labels });
232 }
233 timeseries_ids
234}
235
236pub fn recordbatches_to_timeseries(
237 table_name: &str,
238 recordbatches: RecordBatches,
239) -> Result<Vec<TimeSeries>> {
240 Ok(recordbatches
241 .take()
242 .into_iter()
243 .map(|x| recordbatch_to_timeseries(table_name, x))
244 .collect::<Result<Vec<_>>>()?
245 .into_iter()
246 .flatten()
247 .collect())
248}
249
250fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
251 let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
252 error::InvalidPromRemoteReadQueryResultSnafu {
253 msg: "missing greptime_timestamp column in query result",
254 },
255 )?;
256 ensure!(
257 ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
258 error::InvalidPromRemoteReadQueryResultSnafu {
259 msg: format!(
260 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
261 ts_column.data_type()
262 )
263 }
264 );
265
266 let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
267 error::InvalidPromRemoteReadQueryResultSnafu {
268 msg: "missing greptime_value column in query result",
269 },
270 )?;
271 ensure!(
272 field_column.data_type() == ConcreteDataType::float64_datatype(),
273 error::InvalidPromRemoteReadQueryResultSnafu {
274 msg: format!(
275 "Expect value column of datatype Float64, actual {:?}",
276 field_column.data_type()
277 )
278 }
279 );
280
281 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
283 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
285
286 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
287 let timeseries = timeseries_map
288 .entry(timeseries_id)
289 .or_insert_with(|| TimeSeries {
290 labels: timeseries_id.labels.clone(),
291 ..Default::default()
292 });
293
294 if ts_column.is_null(row) || field_column.is_null(row) {
295 continue;
296 }
297
298 let value: f64 = match field_column.get(row) {
299 Value::Float64(value) => value.into(),
300 _ => unreachable!("checked by the \"ensure\" above"),
301 };
302 let timestamp = match ts_column.get(row) {
303 Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
304 _ => unreachable!("checked by the \"ensure\" above"),
305 };
306 let sample = Sample { value, timestamp };
307
308 timeseries.samples.push(sample);
309 }
310
311 Ok(timeseries_map.into_values().collect())
312}
313
314pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
315 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
316
317 let mut multi_table_data = MultiTableData::new();
318
319 for series in &request.timeseries {
320 let table_name = &series
321 .labels
322 .iter()
323 .find(|label| {
324 label.name == METRIC_NAME_LABEL
326 })
327 .context(error::InvalidPromRemoteRequestSnafu {
328 msg: "missing '__name__' label in time-series",
329 })?
330 .value;
331
332 let num_columns = series.labels.len() + 1;
335
336 let table_data = multi_table_data.get_or_default_table_data(
337 table_name,
338 num_columns,
339 series.samples.len(),
340 );
341
342 let kvs = series.labels.iter().filter_map(|label| {
344 if label.name == METRIC_NAME_LABEL {
345 None
346 } else {
347 Some((label.name.clone(), label.value.clone()))
348 }
349 });
350
351 if series.samples.len() == 1 {
352 let mut one_row = table_data.alloc_one_row();
353
354 row_writer::write_tags(table_data, kvs, &mut one_row)?;
355 row_writer::write_f64(
357 table_data,
358 GREPTIME_VALUE,
359 series.samples[0].value,
360 &mut one_row,
361 )?;
362 row_writer::write_ts_to_millis(
364 table_data,
365 GREPTIME_TIMESTAMP,
366 Some(series.samples[0].timestamp),
367 Precision::Millisecond,
368 &mut one_row,
369 )?;
370
371 table_data.add_row(one_row);
372 } else {
373 for Sample { value, timestamp } in &series.samples {
374 let mut one_row = table_data.alloc_one_row();
375
376 let kvs = kvs.clone();
378 row_writer::write_tags(table_data, kvs, &mut one_row)?;
379 row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
381 row_writer::write_ts_to_millis(
383 table_data,
384 GREPTIME_TIMESTAMP,
385 Some(*timestamp),
386 Precision::Millisecond,
387 &mut one_row,
388 )?;
389
390 table_data.add_row(one_row);
391 }
392 }
393 }
394
395 Ok(multi_table_data.into_row_insert_requests())
396}
397
398#[inline]
399pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
400 let mut decoder = Decoder::new();
401 decoder
402 .decompress_vec(buf)
403 .context(error::DecompressSnappyPromRemoteRequestSnafu)
404}
405
406#[inline]
407pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
408 let mut encoder = Encoder::new();
409 encoder
410 .compress_vec(buf)
411 .context(error::CompressPromRemoteRequestSnafu)
412}
413
414#[inline]
415pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
416 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
417}
418
419pub fn mock_timeseries() -> Vec<TimeSeries> {
422 vec![
423 TimeSeries {
424 labels: vec![
425 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
426 new_label("job".to_string(), "spark".to_string()),
427 ],
428 samples: vec![
429 Sample {
430 value: 1.0f64,
431 timestamp: 1000,
432 },
433 Sample {
434 value: 2.0f64,
435 timestamp: 2000,
436 },
437 ],
438 ..Default::default()
439 },
440 TimeSeries {
441 labels: vec![
442 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
443 new_label("instance".to_string(), "test_host1".to_string()),
444 new_label("idc".to_string(), "z001".to_string()),
445 ],
446 samples: vec![
447 Sample {
448 value: 3.0f64,
449 timestamp: 1000,
450 },
451 Sample {
452 value: 4.0f64,
453 timestamp: 2000,
454 },
455 ],
456 ..Default::default()
457 },
458 TimeSeries {
459 labels: vec![
460 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
461 new_label("idc".to_string(), "z002".to_string()),
462 new_label("app".to_string(), "biz".to_string()),
463 ],
464 samples: vec![
465 Sample {
466 value: 5.0f64,
467 timestamp: 1000,
468 },
469 Sample {
470 value: 6.0f64,
471 timestamp: 2000,
472 },
473 Sample {
474 value: 7.0f64,
475 timestamp: 3000,
476 },
477 ],
478 ..Default::default()
479 },
480 ]
481}
482
483pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
485 let ts_demo_metrics = TimeSeries {
486 labels: vec![
487 new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
488 new_label("idc".to_string(), "idc3".to_string()),
489 new_label("new_label1".to_string(), "foo".to_string()),
490 ],
491 samples: vec![Sample {
492 value: 42.0,
493 timestamp: 3000,
494 }],
495 ..Default::default()
496 };
497 let ts_multi_labels = TimeSeries {
498 labels: vec![
499 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
500 new_label("idc".to_string(), "idc4".to_string()),
501 new_label("env".to_string(), "prod".to_string()),
502 new_label("host".to_string(), "host9".to_string()),
503 new_label("new_label2".to_string(), "bar".to_string()),
504 ],
505 samples: vec![Sample {
506 value: 99.0,
507 timestamp: 4000,
508 }],
509 ..Default::default()
510 };
511
512 vec![ts_demo_metrics, ts_multi_labels]
513}
514
515pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
517 let idc3_schema = TimeSeries {
518 labels: vec![
519 new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
520 new_label("__database__".to_string(), "idc3".to_string()),
521 new_label("__physical_table__".to_string(), "f1".to_string()),
522 ],
523 samples: vec![Sample {
524 value: 42.0,
525 timestamp: 3000,
526 }],
527 ..Default::default()
528 };
529 let idc4_schema = TimeSeries {
530 labels: vec![
531 new_label(
532 METRIC_NAME_LABEL.to_string(),
533 "idc4_local_table".to_string(),
534 ),
535 new_label("__database__".to_string(), "idc4".to_string()),
536 new_label("__physical_table__".to_string(), "f2".to_string()),
537 ],
538 samples: vec![Sample {
539 value: 99.0,
540 timestamp: 4000,
541 }],
542 ..Default::default()
543 };
544
545 vec![idc3_schema, idc4_schema]
546}
547
548#[cfg(test)]
549mod tests {
550 use std::sync::Arc;
551
552 use api::prom_store::remote::LabelMatcher;
553 use api::v1::{ColumnDataType, Row, SemanticType};
554 use datafusion::prelude::SessionContext;
555 use datatypes::schema::{ColumnSchema, Schema};
556 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
557 use table::table::adapter::DfTableProviderAdapter;
558 use table::test_util::MemTable;
559
560 use super::*;
561
562 const EQ_TYPE: i32 = MatcherType::Eq as i32;
563 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
564 const RE_TYPE: i32 = MatcherType::Re as i32;
565
566 #[test]
567 fn test_table_name() {
568 let q = Query {
569 start_timestamp_ms: 1000,
570 end_timestamp_ms: 2000,
571 matchers: vec![],
572 ..Default::default()
573 };
574 let err = table_name(&q).unwrap_err();
575 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
576
577 let q = Query {
578 start_timestamp_ms: 1000,
579 end_timestamp_ms: 2000,
580 matchers: vec![LabelMatcher {
581 name: METRIC_NAME_LABEL.to_string(),
582 value: "test".to_string(),
583 r#type: EQ_TYPE,
584 }],
585 ..Default::default()
586 };
587 assert_eq!("test", table_name(&q).unwrap());
588 }
589
590 #[test]
591 fn test_query_to_plan() {
592 let q = Query {
593 start_timestamp_ms: 1000,
594 end_timestamp_ms: 2000,
595 matchers: vec![LabelMatcher {
596 name: METRIC_NAME_LABEL.to_string(),
597 value: "test".to_string(),
598 r#type: EQ_TYPE,
599 }],
600 ..Default::default()
601 };
602
603 let schema = Arc::new(Schema::new(vec![
604 ColumnSchema::new(
605 GREPTIME_TIMESTAMP,
606 ConcreteDataType::timestamp_millisecond_datatype(),
607 true,
608 ),
609 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
610 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
611 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
612 ]));
613 let recordbatch = RecordBatch::new(
614 schema,
615 vec![
616 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
617 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
618 Arc::new(StringVector::from(vec!["host1"])) as _,
619 Arc::new(StringVector::from(vec!["job"])) as _,
620 ],
621 )
622 .unwrap();
623
624 let ctx = SessionContext::new();
625 let table = MemTable::table("test", recordbatch);
626 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
627
628 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
629 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
630 let display_string = format!("{}", plan.display_indent());
631
632 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n TableScan: ?table?", display_string);
633
634 let q = Query {
635 start_timestamp_ms: 1000,
636 end_timestamp_ms: 2000,
637 matchers: vec![
638 LabelMatcher {
639 name: METRIC_NAME_LABEL.to_string(),
640 value: "test".to_string(),
641 r#type: EQ_TYPE,
642 },
643 LabelMatcher {
644 name: "job".to_string(),
645 value: "*prom*".to_string(),
646 r#type: RE_TYPE,
647 },
648 LabelMatcher {
649 name: "instance".to_string(),
650 value: "localhost".to_string(),
651 r#type: NEQ_TYPE,
652 },
653 ],
654 ..Default::default()
655 };
656
657 let dataframe = ctx.read_table(table_provider).unwrap();
658 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
659 let display_string = format!("{}", plan.display_indent());
660
661 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
662 }
663
664 fn column_schemas_with(
665 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
666 ) -> Vec<api::v1::ColumnSchema> {
667 kts_iter.push((
668 "greptime_value",
669 ColumnDataType::Float64,
670 SemanticType::Field,
671 ));
672 kts_iter.push((
673 "greptime_timestamp",
674 ColumnDataType::TimestampMillisecond,
675 SemanticType::Timestamp,
676 ));
677
678 kts_iter
679 .into_iter()
680 .map(|(k, t, s)| api::v1::ColumnSchema {
681 column_name: k.to_string(),
682 datatype: t as i32,
683 semantic_type: s as i32,
684 ..Default::default()
685 })
686 .collect()
687 }
688
689 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
690 Row {
691 values: vec![
692 api::v1::Value {
693 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
694 },
695 api::v1::Value {
696 value_data: Some(api::v1::value::ValueData::F64Value(value)),
697 },
698 api::v1::Value {
699 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
700 timestamp,
701 )),
702 },
703 ],
704 }
705 }
706
707 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
708 Row {
709 values: vec![
710 api::v1::Value {
711 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
712 },
713 api::v1::Value {
714 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
715 },
716 api::v1::Value {
717 value_data: Some(api::v1::value::ValueData::F64Value(value)),
718 },
719 api::v1::Value {
720 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
721 timestamp,
722 )),
723 },
724 ],
725 }
726 }
727
728 #[test]
729 fn test_write_request_to_row_insert_exprs() {
730 let write_request = WriteRequest {
731 timeseries: mock_timeseries(),
732 ..Default::default()
733 };
734
735 let mut exprs = to_grpc_row_insert_requests(&write_request)
736 .unwrap()
737 .0
738 .inserts;
739 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
740 assert_eq!(3, exprs.len());
741 assert_eq!("metric1", exprs[0].table_name);
742 assert_eq!("metric2", exprs[1].table_name);
743 assert_eq!("metric3", exprs[2].table_name);
744
745 let rows = exprs[0].rows.as_ref().unwrap();
746 let schema = &rows.schema;
747 let rows = &rows.rows;
748 assert_eq!(2, rows.len());
749 assert_eq!(3, schema.len());
750 assert_eq!(
751 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
752 *schema
753 );
754 assert_eq!(
755 &vec![
756 make_row_with_label("spark", 1.0, 1000),
757 make_row_with_label("spark", 2.0, 2000),
758 ],
759 rows
760 );
761
762 let rows = exprs[1].rows.as_ref().unwrap();
763 let schema = &rows.schema;
764 let rows = &rows.rows;
765 assert_eq!(2, rows.len());
766 assert_eq!(4, schema.len());
767 assert_eq!(
768 column_schemas_with(vec![
769 ("instance", ColumnDataType::String, SemanticType::Tag),
770 ("idc", ColumnDataType::String, SemanticType::Tag)
771 ]),
772 *schema
773 );
774 assert_eq!(
775 &vec![
776 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
777 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
778 ],
779 rows
780 );
781
782 let rows = exprs[2].rows.as_ref().unwrap();
783 let schema = &rows.schema;
784 let rows = &rows.rows;
785 assert_eq!(3, rows.len());
786 assert_eq!(4, schema.len());
787 assert_eq!(
788 column_schemas_with(vec![
789 ("idc", ColumnDataType::String, SemanticType::Tag),
790 ("app", ColumnDataType::String, SemanticType::Tag)
791 ]),
792 *schema
793 );
794 assert_eq!(
795 &vec![
796 make_row_with_2_labels("z002", "biz", 5.0, 1000),
797 make_row_with_2_labels("z002", "biz", 6.0, 2000),
798 make_row_with_2_labels("z002", "biz", 7.0, 3000),
799 ],
800 rows
801 );
802 }
803
804 #[test]
805 fn test_recordbatches_to_timeseries() {
806 let schema = Arc::new(Schema::new(vec![
807 ColumnSchema::new(
808 GREPTIME_TIMESTAMP,
809 ConcreteDataType::timestamp_millisecond_datatype(),
810 true,
811 ),
812 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
813 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
814 ]));
815
816 let recordbatches = RecordBatches::try_new(
817 schema.clone(),
818 vec![
819 RecordBatch::new(
820 schema.clone(),
821 vec![
822 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
823 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
824 Arc::new(StringVector::from(vec!["host1"])) as _,
825 ],
826 )
827 .unwrap(),
828 RecordBatch::new(
829 schema,
830 vec![
831 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
832 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
833 Arc::new(StringVector::from(vec!["host2"])) as _,
834 ],
835 )
836 .unwrap(),
837 ],
838 )
839 .unwrap();
840
841 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
842 assert_eq!(2, timeseries.len());
843
844 assert_eq!(
845 vec![
846 Label {
847 name: METRIC_NAME_LABEL.to_string(),
848 value: "metric1".to_string(),
849 },
850 Label {
851 name: "instance".to_string(),
852 value: "host1".to_string(),
853 },
854 ],
855 timeseries[0].labels
856 );
857
858 assert_eq!(
859 timeseries[0].samples,
860 vec![Sample {
861 value: 3.0,
862 timestamp: 1000,
863 }]
864 );
865
866 assert_eq!(
867 vec![
868 Label {
869 name: METRIC_NAME_LABEL.to_string(),
870 value: "metric1".to_string(),
871 },
872 Label {
873 name: "instance".to_string(),
874 value: "host2".to_string(),
875 },
876 ],
877 timeseries[1].labels
878 );
879 assert_eq!(
880 timeseries[1].samples,
881 vec![Sample {
882 value: 7.0,
883 timestamp: 2000,
884 }]
885 );
886 }
887}