1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42
43pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
44
45pub const FIELD_NAME_LABEL: &str = "__field__";
47
48pub struct Metrics {
50 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
51}
52
53pub fn table_name(q: &Query) -> Result<String> {
55 let label_matches = &q.matchers;
56
57 label_matches
58 .iter()
59 .find_map(|m| {
60 if m.name == METRIC_NAME_LABEL {
61 Some(m.value.to_string())
62 } else {
63 None
64 }
65 })
66 .context(error::InvalidPromRemoteRequestSnafu {
67 msg: "missing '__name__' label in timeseries",
68 })
69}
70
71#[tracing::instrument(skip_all)]
73pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
74 let DataFrame::DataFusion(dataframe) = dataframe;
75
76 let start_timestamp_ms = q.start_timestamp_ms;
77 let end_timestamp_ms = q.end_timestamp_ms;
78
79 let label_matches = &q.matchers;
80
81 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
82
83 conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
84 conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
85
86 for m in label_matches {
87 let name = &m.name;
88
89 if name == METRIC_NAME_LABEL {
90 continue;
91 }
92
93 let value = &m.value;
94 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
95 error::InvalidPromRemoteRequestSnafu {
96 msg: format!("invalid LabelMatcher type, decode error: {e}",),
97 }
98 .build()
99 })?;
100
101 match m_type {
102 MatcherType::Eq => {
103 conditions.push(col(name).eq(lit(value)));
104 }
105 MatcherType::Neq => {
106 conditions.push(col(name).not_eq(lit(value)));
107 }
108 MatcherType::Re => {
110 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
111 }
112 MatcherType::Nre => {
114 conditions.push(regexp_match(col(name), lit(value), None).is_null());
115 }
116 }
117 }
118
119 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
121
122 let dataframe = dataframe
123 .filter(conditions)
124 .context(error::DataFrameSnafu)?;
125
126 Ok(dataframe.into_parts().1)
127}
128
129#[inline]
130fn new_label(name: String, value: String) -> Label {
131 Label { name, value }
132}
133
134fn lit_timestamp_millisecond(ts: i64) -> Expr {
135 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None))
136}
137
138#[derive(Debug)]
140struct TimeSeriesId {
141 labels: Vec<Label>,
142}
143
144impl PartialEq for TimeSeriesId {
146 fn eq(&self, other: &Self) -> bool {
147 if self.labels.len() != other.labels.len() {
148 return false;
149 }
150
151 self.labels
152 .iter()
153 .zip(other.labels.iter())
154 .all(|(l, r)| l.name == r.name && l.value == r.value)
155 }
156}
157impl Eq for TimeSeriesId {}
158
159impl Hash for TimeSeriesId {
160 fn hash<H: Hasher>(&self, state: &mut H) {
161 for label in &self.labels {
162 label.name.hash(state);
163 label.value.hash(state);
164 }
165 }
166}
167
168impl Ord for TimeSeriesId {
170 fn cmp(&self, other: &Self) -> Ordering {
171 let ordering = self.labels.len().cmp(&other.labels.len());
172 if ordering != Ordering::Equal {
173 return ordering;
174 }
175
176 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
177 let ordering = l.name.cmp(&r.name);
178
179 if ordering != Ordering::Equal {
180 return ordering;
181 }
182
183 let ordering = l.value.cmp(&r.value);
184
185 if ordering != Ordering::Equal {
186 return ordering;
187 }
188 }
189 Ordering::Equal
190 }
191}
192
193impl PartialOrd for TimeSeriesId {
194 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
195 Some(self.cmp(other))
196 }
197}
198
199fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
202 let row_count = recordbatch.num_rows();
203 let mut timeseries_ids = Vec::with_capacity(row_count);
204
205 for row in 0..row_count {
206 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
207 labels.push(new_label(
208 METRIC_NAME_LABEL.to_string(),
209 table_name.to_string(),
210 ));
211
212 for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
213 if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
214 continue;
215 }
216
217 let column = &recordbatch.columns()[i];
218 if column.is_null(row) {
220 continue;
221 }
222
223 let value = column.get(row).to_string();
224 labels.push(new_label(column_schema.name.clone(), value));
225 }
226 timeseries_ids.push(TimeSeriesId { labels });
227 }
228 timeseries_ids
229}
230
231pub fn recordbatches_to_timeseries(
232 table_name: &str,
233 recordbatches: RecordBatches,
234) -> Result<Vec<TimeSeries>> {
235 Ok(recordbatches
236 .take()
237 .into_iter()
238 .map(|x| recordbatch_to_timeseries(table_name, x))
239 .collect::<Result<Vec<_>>>()?
240 .into_iter()
241 .flatten()
242 .collect())
243}
244
245fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
246 let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
247 error::InvalidPromRemoteReadQueryResultSnafu {
248 msg: "missing greptime_timestamp column in query result",
249 },
250 )?;
251 ensure!(
252 ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
253 error::InvalidPromRemoteReadQueryResultSnafu {
254 msg: format!(
255 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
256 ts_column.data_type()
257 )
258 }
259 );
260
261 let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
262 error::InvalidPromRemoteReadQueryResultSnafu {
263 msg: "missing greptime_value column in query result",
264 },
265 )?;
266 ensure!(
267 field_column.data_type() == ConcreteDataType::float64_datatype(),
268 error::InvalidPromRemoteReadQueryResultSnafu {
269 msg: format!(
270 "Expect value column of datatype Float64, actual {:?}",
271 field_column.data_type()
272 )
273 }
274 );
275
276 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
278 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
280
281 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
282 let timeseries = timeseries_map
283 .entry(timeseries_id)
284 .or_insert_with(|| TimeSeries {
285 labels: timeseries_id.labels.clone(),
286 ..Default::default()
287 });
288
289 if ts_column.is_null(row) || field_column.is_null(row) {
290 continue;
291 }
292
293 let value: f64 = match field_column.get(row) {
294 Value::Float64(value) => value.into(),
295 _ => unreachable!("checked by the \"ensure\" above"),
296 };
297 let timestamp = match ts_column.get(row) {
298 Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
299 _ => unreachable!("checked by the \"ensure\" above"),
300 };
301 let sample = Sample { value, timestamp };
302
303 timeseries.samples.push(sample);
304 }
305
306 Ok(timeseries_map.into_values().collect())
307}
308
309pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
310 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
311
312 let mut multi_table_data = MultiTableData::new();
313
314 for series in &request.timeseries {
315 let table_name = &series
316 .labels
317 .iter()
318 .find(|label| {
319 label.name == METRIC_NAME_LABEL
321 })
322 .context(error::InvalidPromRemoteRequestSnafu {
323 msg: "missing '__name__' label in time-series",
324 })?
325 .value;
326
327 let num_columns = series.labels.len() + 1;
330
331 let table_data = multi_table_data.get_or_default_table_data(
332 table_name,
333 num_columns,
334 series.samples.len(),
335 );
336
337 let kvs = series.labels.iter().filter_map(|label| {
339 if label.name == METRIC_NAME_LABEL {
340 None
341 } else {
342 Some((label.name.clone(), label.value.clone()))
343 }
344 });
345
346 if series.samples.len() == 1 {
347 let mut one_row = table_data.alloc_one_row();
348
349 row_writer::write_tags(table_data, kvs, &mut one_row)?;
350 row_writer::write_f64(
352 table_data,
353 GREPTIME_VALUE,
354 series.samples[0].value,
355 &mut one_row,
356 )?;
357 row_writer::write_ts_to_millis(
359 table_data,
360 GREPTIME_TIMESTAMP,
361 Some(series.samples[0].timestamp),
362 Precision::Millisecond,
363 &mut one_row,
364 )?;
365
366 table_data.add_row(one_row);
367 } else {
368 for Sample { value, timestamp } in &series.samples {
369 let mut one_row = table_data.alloc_one_row();
370
371 let kvs = kvs.clone();
373 row_writer::write_tags(table_data, kvs, &mut one_row)?;
374 row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
376 row_writer::write_ts_to_millis(
378 table_data,
379 GREPTIME_TIMESTAMP,
380 Some(*timestamp),
381 Precision::Millisecond,
382 &mut one_row,
383 )?;
384
385 table_data.add_row(one_row);
386 }
387 }
388 }
389
390 Ok(multi_table_data.into_row_insert_requests())
391}
392
393#[inline]
394pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
395 let mut decoder = Decoder::new();
396 decoder
397 .decompress_vec(buf)
398 .context(error::DecompressSnappyPromRemoteRequestSnafu)
399}
400
401#[inline]
402pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
403 let mut encoder = Encoder::new();
404 encoder
405 .compress_vec(buf)
406 .context(error::CompressPromRemoteRequestSnafu)
407}
408
409#[inline]
410pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
411 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
412}
413
414pub fn mock_timeseries() -> Vec<TimeSeries> {
417 vec![
418 TimeSeries {
419 labels: vec![
420 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
421 new_label("job".to_string(), "spark".to_string()),
422 ],
423 samples: vec![
424 Sample {
425 value: 1.0f64,
426 timestamp: 1000,
427 },
428 Sample {
429 value: 2.0f64,
430 timestamp: 2000,
431 },
432 ],
433 ..Default::default()
434 },
435 TimeSeries {
436 labels: vec![
437 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
438 new_label("instance".to_string(), "test_host1".to_string()),
439 new_label("idc".to_string(), "z001".to_string()),
440 ],
441 samples: vec![
442 Sample {
443 value: 3.0f64,
444 timestamp: 1000,
445 },
446 Sample {
447 value: 4.0f64,
448 timestamp: 2000,
449 },
450 ],
451 ..Default::default()
452 },
453 TimeSeries {
454 labels: vec![
455 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
456 new_label("idc".to_string(), "z002".to_string()),
457 new_label("app".to_string(), "biz".to_string()),
458 ],
459 samples: vec![
460 Sample {
461 value: 5.0f64,
462 timestamp: 1000,
463 },
464 Sample {
465 value: 6.0f64,
466 timestamp: 2000,
467 },
468 Sample {
469 value: 7.0f64,
470 timestamp: 3000,
471 },
472 ],
473 ..Default::default()
474 },
475 ]
476}
477
478pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
480 let ts_demo_metrics = TimeSeries {
481 labels: vec![
482 new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
483 new_label("idc".to_string(), "idc3".to_string()),
484 new_label("new_label1".to_string(), "foo".to_string()),
485 ],
486 samples: vec![Sample {
487 value: 42.0,
488 timestamp: 3000,
489 }],
490 ..Default::default()
491 };
492 let ts_multi_labels = TimeSeries {
493 labels: vec![
494 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
495 new_label("idc".to_string(), "idc4".to_string()),
496 new_label("env".to_string(), "prod".to_string()),
497 new_label("host".to_string(), "host9".to_string()),
498 new_label("new_label2".to_string(), "bar".to_string()),
499 ],
500 samples: vec![Sample {
501 value: 99.0,
502 timestamp: 4000,
503 }],
504 ..Default::default()
505 };
506
507 vec![ts_demo_metrics, ts_multi_labels]
508}
509
510#[cfg(test)]
511mod tests {
512 use std::sync::Arc;
513
514 use api::prom_store::remote::LabelMatcher;
515 use api::v1::{ColumnDataType, Row, SemanticType};
516 use datafusion::prelude::SessionContext;
517 use datatypes::schema::{ColumnSchema, Schema};
518 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
519 use table::table::adapter::DfTableProviderAdapter;
520 use table::test_util::MemTable;
521
522 use super::*;
523
524 const EQ_TYPE: i32 = MatcherType::Eq as i32;
525 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
526 const RE_TYPE: i32 = MatcherType::Re as i32;
527
528 #[test]
529 fn test_table_name() {
530 let q = Query {
531 start_timestamp_ms: 1000,
532 end_timestamp_ms: 2000,
533 matchers: vec![],
534 ..Default::default()
535 };
536 let err = table_name(&q).unwrap_err();
537 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
538
539 let q = Query {
540 start_timestamp_ms: 1000,
541 end_timestamp_ms: 2000,
542 matchers: vec![LabelMatcher {
543 name: METRIC_NAME_LABEL.to_string(),
544 value: "test".to_string(),
545 r#type: EQ_TYPE,
546 }],
547 ..Default::default()
548 };
549 assert_eq!("test", table_name(&q).unwrap());
550 }
551
552 #[test]
553 fn test_query_to_plan() {
554 let q = Query {
555 start_timestamp_ms: 1000,
556 end_timestamp_ms: 2000,
557 matchers: vec![LabelMatcher {
558 name: METRIC_NAME_LABEL.to_string(),
559 value: "test".to_string(),
560 r#type: EQ_TYPE,
561 }],
562 ..Default::default()
563 };
564
565 let schema = Arc::new(Schema::new(vec![
566 ColumnSchema::new(
567 GREPTIME_TIMESTAMP,
568 ConcreteDataType::timestamp_millisecond_datatype(),
569 true,
570 ),
571 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
572 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
573 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
574 ]));
575 let recordbatch = RecordBatch::new(
576 schema,
577 vec![
578 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
579 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
580 Arc::new(StringVector::from(vec!["host1"])) as _,
581 Arc::new(StringVector::from(vec!["job"])) as _,
582 ],
583 )
584 .unwrap();
585
586 let ctx = SessionContext::new();
587 let table = MemTable::table("test", recordbatch);
588 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
589
590 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
591 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
592 let display_string = format!("{}", plan.display_indent());
593
594 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n TableScan: ?table?", display_string);
595
596 let q = Query {
597 start_timestamp_ms: 1000,
598 end_timestamp_ms: 2000,
599 matchers: vec![
600 LabelMatcher {
601 name: METRIC_NAME_LABEL.to_string(),
602 value: "test".to_string(),
603 r#type: EQ_TYPE,
604 },
605 LabelMatcher {
606 name: "job".to_string(),
607 value: "*prom*".to_string(),
608 r#type: RE_TYPE,
609 },
610 LabelMatcher {
611 name: "instance".to_string(),
612 value: "localhost".to_string(),
613 r#type: NEQ_TYPE,
614 },
615 ],
616 ..Default::default()
617 };
618
619 let dataframe = ctx.read_table(table_provider).unwrap();
620 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
621 let display_string = format!("{}", plan.display_indent());
622
623 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
624 }
625
626 fn column_schemas_with(
627 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
628 ) -> Vec<api::v1::ColumnSchema> {
629 kts_iter.push((
630 "greptime_value",
631 ColumnDataType::Float64,
632 SemanticType::Field,
633 ));
634 kts_iter.push((
635 "greptime_timestamp",
636 ColumnDataType::TimestampMillisecond,
637 SemanticType::Timestamp,
638 ));
639
640 kts_iter
641 .into_iter()
642 .map(|(k, t, s)| api::v1::ColumnSchema {
643 column_name: k.to_string(),
644 datatype: t as i32,
645 semantic_type: s as i32,
646 ..Default::default()
647 })
648 .collect()
649 }
650
651 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
652 Row {
653 values: vec![
654 api::v1::Value {
655 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
656 },
657 api::v1::Value {
658 value_data: Some(api::v1::value::ValueData::F64Value(value)),
659 },
660 api::v1::Value {
661 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
662 timestamp,
663 )),
664 },
665 ],
666 }
667 }
668
669 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
670 Row {
671 values: vec![
672 api::v1::Value {
673 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
674 },
675 api::v1::Value {
676 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
677 },
678 api::v1::Value {
679 value_data: Some(api::v1::value::ValueData::F64Value(value)),
680 },
681 api::v1::Value {
682 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
683 timestamp,
684 )),
685 },
686 ],
687 }
688 }
689
690 #[test]
691 fn test_write_request_to_row_insert_exprs() {
692 let write_request = WriteRequest {
693 timeseries: mock_timeseries(),
694 ..Default::default()
695 };
696
697 let mut exprs = to_grpc_row_insert_requests(&write_request)
698 .unwrap()
699 .0
700 .inserts;
701 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
702 assert_eq!(3, exprs.len());
703 assert_eq!("metric1", exprs[0].table_name);
704 assert_eq!("metric2", exprs[1].table_name);
705 assert_eq!("metric3", exprs[2].table_name);
706
707 let rows = exprs[0].rows.as_ref().unwrap();
708 let schema = &rows.schema;
709 let rows = &rows.rows;
710 assert_eq!(2, rows.len());
711 assert_eq!(3, schema.len());
712 assert_eq!(
713 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
714 *schema
715 );
716 assert_eq!(
717 &vec![
718 make_row_with_label("spark", 1.0, 1000),
719 make_row_with_label("spark", 2.0, 2000),
720 ],
721 rows
722 );
723
724 let rows = exprs[1].rows.as_ref().unwrap();
725 let schema = &rows.schema;
726 let rows = &rows.rows;
727 assert_eq!(2, rows.len());
728 assert_eq!(4, schema.len());
729 assert_eq!(
730 column_schemas_with(vec![
731 ("instance", ColumnDataType::String, SemanticType::Tag),
732 ("idc", ColumnDataType::String, SemanticType::Tag)
733 ]),
734 *schema
735 );
736 assert_eq!(
737 &vec![
738 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
739 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
740 ],
741 rows
742 );
743
744 let rows = exprs[2].rows.as_ref().unwrap();
745 let schema = &rows.schema;
746 let rows = &rows.rows;
747 assert_eq!(3, rows.len());
748 assert_eq!(4, schema.len());
749 assert_eq!(
750 column_schemas_with(vec![
751 ("idc", ColumnDataType::String, SemanticType::Tag),
752 ("app", ColumnDataType::String, SemanticType::Tag)
753 ]),
754 *schema
755 );
756 assert_eq!(
757 &vec![
758 make_row_with_2_labels("z002", "biz", 5.0, 1000),
759 make_row_with_2_labels("z002", "biz", 6.0, 2000),
760 make_row_with_2_labels("z002", "biz", 7.0, 3000),
761 ],
762 rows
763 );
764 }
765
766 #[test]
767 fn test_recordbatches_to_timeseries() {
768 let schema = Arc::new(Schema::new(vec![
769 ColumnSchema::new(
770 GREPTIME_TIMESTAMP,
771 ConcreteDataType::timestamp_millisecond_datatype(),
772 true,
773 ),
774 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
775 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
776 ]));
777
778 let recordbatches = RecordBatches::try_new(
779 schema.clone(),
780 vec![
781 RecordBatch::new(
782 schema.clone(),
783 vec![
784 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
785 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
786 Arc::new(StringVector::from(vec!["host1"])) as _,
787 ],
788 )
789 .unwrap(),
790 RecordBatch::new(
791 schema,
792 vec![
793 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
794 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
795 Arc::new(StringVector::from(vec!["host2"])) as _,
796 ],
797 )
798 .unwrap(),
799 ],
800 )
801 .unwrap();
802
803 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
804 assert_eq!(2, timeseries.len());
805
806 assert_eq!(
807 vec![
808 Label {
809 name: METRIC_NAME_LABEL.to_string(),
810 value: "metric1".to_string(),
811 },
812 Label {
813 name: "instance".to_string(),
814 value: "host1".to_string(),
815 },
816 ],
817 timeseries[0].labels
818 );
819
820 assert_eq!(
821 timeseries[0].samples,
822 vec![Sample {
823 value: 3.0,
824 timestamp: 1000,
825 }]
826 );
827
828 assert_eq!(
829 vec![
830 Label {
831 name: METRIC_NAME_LABEL.to_string(),
832 value: "metric1".to_string(),
833 },
834 Label {
835 name: "instance".to_string(),
836 value: "host2".to_string(),
837 },
838 ],
839 timeseries[1].labels
840 );
841 assert_eq!(
842 timeseries[1].samples,
843 vec![Sample {
844 value: 7.0,
845 timestamp: 2000,
846 }]
847 );
848 }
849}