1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42
43pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
44
45pub const FIELD_NAME_LABEL: &str = "__field__";
47
48pub struct Metrics {
50 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
51}
52
53pub fn table_name(q: &Query) -> Result<String> {
55 let label_matches = &q.matchers;
56
57 label_matches
58 .iter()
59 .find_map(|m| {
60 if m.name == METRIC_NAME_LABEL {
61 Some(m.value.to_string())
62 } else {
63 None
64 }
65 })
66 .context(error::InvalidPromRemoteRequestSnafu {
67 msg: "missing '__name__' label in timeseries",
68 })
69}
70
71#[tracing::instrument(skip_all)]
73pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
74 let DataFrame::DataFusion(dataframe) = dataframe;
75
76 let start_timestamp_ms = q.start_timestamp_ms;
77 let end_timestamp_ms = q.end_timestamp_ms;
78
79 let label_matches = &q.matchers;
80
81 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
82
83 conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
84 conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
85
86 for m in label_matches {
87 let name = &m.name;
88
89 if name == METRIC_NAME_LABEL {
90 continue;
91 }
92
93 let value = &m.value;
94 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
95 error::InvalidPromRemoteRequestSnafu {
96 msg: format!("invalid LabelMatcher type, decode error: {e}",),
97 }
98 .build()
99 })?;
100
101 match m_type {
102 MatcherType::Eq => {
103 conditions.push(col(name).eq(lit(value)));
104 }
105 MatcherType::Neq => {
106 conditions.push(col(name).not_eq(lit(value)));
107 }
108 MatcherType::Re => {
110 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
111 }
112 MatcherType::Nre => {
114 conditions.push(regexp_match(col(name), lit(value), None).is_null());
115 }
116 }
117 }
118
119 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
121
122 let dataframe = dataframe
123 .filter(conditions)
124 .context(error::DataFrameSnafu)?;
125
126 Ok(dataframe.into_parts().1)
127}
128
129#[inline]
130fn new_label(name: String, value: String) -> Label {
131 Label { name, value }
132}
133
134fn lit_timestamp_millisecond(ts: i64) -> Expr {
135 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None))
136}
137
138#[derive(Debug)]
140struct TimeSeriesId {
141 labels: Vec<Label>,
142}
143
144impl PartialEq for TimeSeriesId {
146 fn eq(&self, other: &Self) -> bool {
147 if self.labels.len() != other.labels.len() {
148 return false;
149 }
150
151 self.labels
152 .iter()
153 .zip(other.labels.iter())
154 .all(|(l, r)| l.name == r.name && l.value == r.value)
155 }
156}
157impl Eq for TimeSeriesId {}
158
159impl Hash for TimeSeriesId {
160 fn hash<H: Hasher>(&self, state: &mut H) {
161 for label in &self.labels {
162 label.name.hash(state);
163 label.value.hash(state);
164 }
165 }
166}
167
168impl Ord for TimeSeriesId {
170 fn cmp(&self, other: &Self) -> Ordering {
171 let ordering = self.labels.len().cmp(&other.labels.len());
172 if ordering != Ordering::Equal {
173 return ordering;
174 }
175
176 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
177 let ordering = l.name.cmp(&r.name);
178
179 if ordering != Ordering::Equal {
180 return ordering;
181 }
182
183 let ordering = l.value.cmp(&r.value);
184
185 if ordering != Ordering::Equal {
186 return ordering;
187 }
188 }
189 Ordering::Equal
190 }
191}
192
193impl PartialOrd for TimeSeriesId {
194 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
195 Some(self.cmp(other))
196 }
197}
198
199fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
202 let row_count = recordbatch.num_rows();
203 let mut timeseries_ids = Vec::with_capacity(row_count);
204
205 for row in 0..row_count {
206 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
207 labels.push(new_label(
208 METRIC_NAME_LABEL.to_string(),
209 table_name.to_string(),
210 ));
211
212 for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
213 if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
214 continue;
215 }
216
217 let column = &recordbatch.columns()[i];
218 if column.is_null(row) {
220 continue;
221 }
222
223 let value = column.get(row).to_string();
224 labels.push(new_label(column_schema.name.clone(), value));
225 }
226 timeseries_ids.push(TimeSeriesId { labels });
227 }
228 timeseries_ids
229}
230
231pub fn recordbatches_to_timeseries(
232 table_name: &str,
233 recordbatches: RecordBatches,
234) -> Result<Vec<TimeSeries>> {
235 Ok(recordbatches
236 .take()
237 .into_iter()
238 .map(|x| recordbatch_to_timeseries(table_name, x))
239 .collect::<Result<Vec<_>>>()?
240 .into_iter()
241 .flatten()
242 .collect())
243}
244
245fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
246 let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
247 error::InvalidPromRemoteReadQueryResultSnafu {
248 msg: "missing greptime_timestamp column in query result",
249 },
250 )?;
251 ensure!(
252 ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
253 error::InvalidPromRemoteReadQueryResultSnafu {
254 msg: format!(
255 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
256 ts_column.data_type()
257 )
258 }
259 );
260
261 let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
262 error::InvalidPromRemoteReadQueryResultSnafu {
263 msg: "missing greptime_value column in query result",
264 },
265 )?;
266 ensure!(
267 field_column.data_type() == ConcreteDataType::float64_datatype(),
268 error::InvalidPromRemoteReadQueryResultSnafu {
269 msg: format!(
270 "Expect value column of datatype Float64, actual {:?}",
271 field_column.data_type()
272 )
273 }
274 );
275
276 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
278 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
280
281 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
282 let timeseries = timeseries_map
283 .entry(timeseries_id)
284 .or_insert_with(|| TimeSeries {
285 labels: timeseries_id.labels.clone(),
286 ..Default::default()
287 });
288
289 if ts_column.is_null(row) || field_column.is_null(row) {
290 continue;
291 }
292
293 let value: f64 = match field_column.get(row) {
294 Value::Float64(value) => value.into(),
295 _ => unreachable!("checked by the \"ensure\" above"),
296 };
297 let timestamp = match ts_column.get(row) {
298 Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
299 _ => unreachable!("checked by the \"ensure\" above"),
300 };
301 let sample = Sample { value, timestamp };
302
303 timeseries.samples.push(sample);
304 }
305
306 Ok(timeseries_map.into_values().collect())
307}
308
309pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
310 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
311
312 let mut multi_table_data = MultiTableData::new();
313
314 for series in &request.timeseries {
315 let table_name = &series
316 .labels
317 .iter()
318 .find(|label| {
319 label.name == METRIC_NAME_LABEL
321 })
322 .context(error::InvalidPromRemoteRequestSnafu {
323 msg: "missing '__name__' label in time-series",
324 })?
325 .value;
326
327 let num_columns = series.labels.len() + 1;
330
331 let table_data = multi_table_data.get_or_default_table_data(
332 table_name,
333 num_columns,
334 series.samples.len(),
335 );
336
337 let kvs = series.labels.iter().filter_map(|label| {
339 if label.name == METRIC_NAME_LABEL {
340 None
341 } else {
342 Some((label.name.clone(), label.value.clone()))
343 }
344 });
345
346 if series.samples.len() == 1 {
347 let mut one_row = table_data.alloc_one_row();
348
349 row_writer::write_tags(table_data, kvs, &mut one_row)?;
350 row_writer::write_f64(
352 table_data,
353 GREPTIME_VALUE,
354 series.samples[0].value,
355 &mut one_row,
356 )?;
357 row_writer::write_ts_to_millis(
359 table_data,
360 GREPTIME_TIMESTAMP,
361 Some(series.samples[0].timestamp),
362 Precision::Millisecond,
363 &mut one_row,
364 )?;
365
366 table_data.add_row(one_row);
367 } else {
368 for Sample { value, timestamp } in &series.samples {
369 let mut one_row = table_data.alloc_one_row();
370
371 let kvs = kvs.clone();
373 row_writer::write_tags(table_data, kvs, &mut one_row)?;
374 row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
376 row_writer::write_ts_to_millis(
378 table_data,
379 GREPTIME_TIMESTAMP,
380 Some(*timestamp),
381 Precision::Millisecond,
382 &mut one_row,
383 )?;
384
385 table_data.add_row(one_row);
386 }
387 }
388 }
389
390 Ok(multi_table_data.into_row_insert_requests())
391}
392
393#[inline]
394pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
395 let mut decoder = Decoder::new();
396 decoder
397 .decompress_vec(buf)
398 .context(error::DecompressSnappyPromRemoteRequestSnafu)
399}
400
401#[inline]
402pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
403 let mut encoder = Encoder::new();
404 encoder
405 .compress_vec(buf)
406 .context(error::CompressPromRemoteRequestSnafu)
407}
408
409#[inline]
410pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
411 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
412}
413
414pub fn mock_timeseries() -> Vec<TimeSeries> {
417 vec![
418 TimeSeries {
419 labels: vec![
420 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
421 new_label("job".to_string(), "spark".to_string()),
422 ],
423 samples: vec![
424 Sample {
425 value: 1.0f64,
426 timestamp: 1000,
427 },
428 Sample {
429 value: 2.0f64,
430 timestamp: 2000,
431 },
432 ],
433 ..Default::default()
434 },
435 TimeSeries {
436 labels: vec![
437 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
438 new_label("instance".to_string(), "test_host1".to_string()),
439 new_label("idc".to_string(), "z001".to_string()),
440 ],
441 samples: vec![
442 Sample {
443 value: 3.0f64,
444 timestamp: 1000,
445 },
446 Sample {
447 value: 4.0f64,
448 timestamp: 2000,
449 },
450 ],
451 ..Default::default()
452 },
453 TimeSeries {
454 labels: vec![
455 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
456 new_label("idc".to_string(), "z002".to_string()),
457 new_label("app".to_string(), "biz".to_string()),
458 ],
459 samples: vec![
460 Sample {
461 value: 5.0f64,
462 timestamp: 1000,
463 },
464 Sample {
465 value: 6.0f64,
466 timestamp: 2000,
467 },
468 Sample {
469 value: 7.0f64,
470 timestamp: 3000,
471 },
472 ],
473 ..Default::default()
474 },
475 ]
476}
477
478#[cfg(test)]
479mod tests {
480 use std::sync::Arc;
481
482 use api::prom_store::remote::LabelMatcher;
483 use api::v1::{ColumnDataType, Row, SemanticType};
484 use datafusion::prelude::SessionContext;
485 use datatypes::schema::{ColumnSchema, Schema};
486 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
487 use table::table::adapter::DfTableProviderAdapter;
488 use table::test_util::MemTable;
489
490 use super::*;
491
492 const EQ_TYPE: i32 = MatcherType::Eq as i32;
493 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
494 const RE_TYPE: i32 = MatcherType::Re as i32;
495
496 #[test]
497 fn test_table_name() {
498 let q = Query {
499 start_timestamp_ms: 1000,
500 end_timestamp_ms: 2000,
501 matchers: vec![],
502 ..Default::default()
503 };
504 let err = table_name(&q).unwrap_err();
505 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
506
507 let q = Query {
508 start_timestamp_ms: 1000,
509 end_timestamp_ms: 2000,
510 matchers: vec![LabelMatcher {
511 name: METRIC_NAME_LABEL.to_string(),
512 value: "test".to_string(),
513 r#type: EQ_TYPE,
514 }],
515 ..Default::default()
516 };
517 assert_eq!("test", table_name(&q).unwrap());
518 }
519
520 #[test]
521 fn test_query_to_plan() {
522 let q = Query {
523 start_timestamp_ms: 1000,
524 end_timestamp_ms: 2000,
525 matchers: vec![LabelMatcher {
526 name: METRIC_NAME_LABEL.to_string(),
527 value: "test".to_string(),
528 r#type: EQ_TYPE,
529 }],
530 ..Default::default()
531 };
532
533 let schema = Arc::new(Schema::new(vec![
534 ColumnSchema::new(
535 GREPTIME_TIMESTAMP,
536 ConcreteDataType::timestamp_millisecond_datatype(),
537 true,
538 ),
539 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
540 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
541 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
542 ]));
543 let recordbatch = RecordBatch::new(
544 schema,
545 vec![
546 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
547 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
548 Arc::new(StringVector::from(vec!["host1"])) as _,
549 Arc::new(StringVector::from(vec!["job"])) as _,
550 ],
551 )
552 .unwrap();
553
554 let ctx = SessionContext::new();
555 let table = MemTable::table("test", recordbatch);
556 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
557
558 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
559 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
560 let display_string = format!("{}", plan.display_indent());
561
562 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n TableScan: ?table?", display_string);
563
564 let q = Query {
565 start_timestamp_ms: 1000,
566 end_timestamp_ms: 2000,
567 matchers: vec![
568 LabelMatcher {
569 name: METRIC_NAME_LABEL.to_string(),
570 value: "test".to_string(),
571 r#type: EQ_TYPE,
572 },
573 LabelMatcher {
574 name: "job".to_string(),
575 value: "*prom*".to_string(),
576 r#type: RE_TYPE,
577 },
578 LabelMatcher {
579 name: "instance".to_string(),
580 value: "localhost".to_string(),
581 r#type: NEQ_TYPE,
582 },
583 ],
584 ..Default::default()
585 };
586
587 let dataframe = ctx.read_table(table_provider).unwrap();
588 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
589 let display_string = format!("{}", plan.display_indent());
590
591 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
592 }
593
594 fn column_schemas_with(
595 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
596 ) -> Vec<api::v1::ColumnSchema> {
597 kts_iter.push((
598 "greptime_value",
599 ColumnDataType::Float64,
600 SemanticType::Field,
601 ));
602 kts_iter.push((
603 "greptime_timestamp",
604 ColumnDataType::TimestampMillisecond,
605 SemanticType::Timestamp,
606 ));
607
608 kts_iter
609 .into_iter()
610 .map(|(k, t, s)| api::v1::ColumnSchema {
611 column_name: k.to_string(),
612 datatype: t as i32,
613 semantic_type: s as i32,
614 ..Default::default()
615 })
616 .collect()
617 }
618
619 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
620 Row {
621 values: vec![
622 api::v1::Value {
623 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
624 },
625 api::v1::Value {
626 value_data: Some(api::v1::value::ValueData::F64Value(value)),
627 },
628 api::v1::Value {
629 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
630 timestamp,
631 )),
632 },
633 ],
634 }
635 }
636
637 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
638 Row {
639 values: vec![
640 api::v1::Value {
641 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
642 },
643 api::v1::Value {
644 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
645 },
646 api::v1::Value {
647 value_data: Some(api::v1::value::ValueData::F64Value(value)),
648 },
649 api::v1::Value {
650 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
651 timestamp,
652 )),
653 },
654 ],
655 }
656 }
657
658 #[test]
659 fn test_write_request_to_row_insert_exprs() {
660 let write_request = WriteRequest {
661 timeseries: mock_timeseries(),
662 ..Default::default()
663 };
664
665 let mut exprs = to_grpc_row_insert_requests(&write_request)
666 .unwrap()
667 .0
668 .inserts;
669 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
670 assert_eq!(3, exprs.len());
671 assert_eq!("metric1", exprs[0].table_name);
672 assert_eq!("metric2", exprs[1].table_name);
673 assert_eq!("metric3", exprs[2].table_name);
674
675 let rows = exprs[0].rows.as_ref().unwrap();
676 let schema = &rows.schema;
677 let rows = &rows.rows;
678 assert_eq!(2, rows.len());
679 assert_eq!(3, schema.len());
680 assert_eq!(
681 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
682 *schema
683 );
684 assert_eq!(
685 &vec![
686 make_row_with_label("spark", 1.0, 1000),
687 make_row_with_label("spark", 2.0, 2000),
688 ],
689 rows
690 );
691
692 let rows = exprs[1].rows.as_ref().unwrap();
693 let schema = &rows.schema;
694 let rows = &rows.rows;
695 assert_eq!(2, rows.len());
696 assert_eq!(4, schema.len());
697 assert_eq!(
698 column_schemas_with(vec![
699 ("instance", ColumnDataType::String, SemanticType::Tag),
700 ("idc", ColumnDataType::String, SemanticType::Tag)
701 ]),
702 *schema
703 );
704 assert_eq!(
705 &vec![
706 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
707 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
708 ],
709 rows
710 );
711
712 let rows = exprs[2].rows.as_ref().unwrap();
713 let schema = &rows.schema;
714 let rows = &rows.rows;
715 assert_eq!(3, rows.len());
716 assert_eq!(4, schema.len());
717 assert_eq!(
718 column_schemas_with(vec![
719 ("idc", ColumnDataType::String, SemanticType::Tag),
720 ("app", ColumnDataType::String, SemanticType::Tag)
721 ]),
722 *schema
723 );
724 assert_eq!(
725 &vec![
726 make_row_with_2_labels("z002", "biz", 5.0, 1000),
727 make_row_with_2_labels("z002", "biz", 6.0, 2000),
728 make_row_with_2_labels("z002", "biz", 7.0, 3000),
729 ],
730 rows
731 );
732 }
733
734 #[test]
735 fn test_recordbatches_to_timeseries() {
736 let schema = Arc::new(Schema::new(vec![
737 ColumnSchema::new(
738 GREPTIME_TIMESTAMP,
739 ConcreteDataType::timestamp_millisecond_datatype(),
740 true,
741 ),
742 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
743 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
744 ]));
745
746 let recordbatches = RecordBatches::try_new(
747 schema.clone(),
748 vec![
749 RecordBatch::new(
750 schema.clone(),
751 vec![
752 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
753 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
754 Arc::new(StringVector::from(vec!["host1"])) as _,
755 ],
756 )
757 .unwrap(),
758 RecordBatch::new(
759 schema,
760 vec![
761 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
762 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
763 Arc::new(StringVector::from(vec!["host2"])) as _,
764 ],
765 )
766 .unwrap(),
767 ],
768 )
769 .unwrap();
770
771 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
772 assert_eq!(2, timeseries.len());
773
774 assert_eq!(
775 vec![
776 Label {
777 name: METRIC_NAME_LABEL.to_string(),
778 value: "metric1".to_string(),
779 },
780 Label {
781 name: "instance".to_string(),
782 value: "host1".to_string(),
783 },
784 ],
785 timeseries[0].labels
786 );
787
788 assert_eq!(
789 timeseries[0].samples,
790 vec![Sample {
791 value: 3.0,
792 timestamp: 1000,
793 }]
794 );
795
796 assert_eq!(
797 vec![
798 Label {
799 name: METRIC_NAME_LABEL.to_string(),
800 value: "metric1".to_string(),
801 },
802 Label {
803 name: "instance".to_string(),
804 value: "host2".to_string(),
805 },
806 ],
807 timeseries[1].labels
808 );
809 assert_eq!(
810 timeseries[1].samples,
811 vec![Sample {
812 value: 7.0,
813 timestamp: 2000,
814 }]
815 );
816 }
817}