1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{col, lit, regexp_match, Expr};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{ensure, OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53pub const FIELD_NAME_LABEL: &str = "__field__";
55
56pub struct Metrics {
58 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61pub fn table_name(q: &Query) -> Result<String> {
63 let label_matches = &q.matchers;
64
65 label_matches
66 .iter()
67 .find_map(|m| {
68 if m.name == METRIC_NAME_LABEL {
69 Some(m.value.to_string())
70 } else {
71 None
72 }
73 })
74 .context(error::InvalidPromRemoteRequestSnafu {
75 msg: "missing '__name__' label in timeseries",
76 })
77}
78
79pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82 for query in &request.queries {
83 for matcher in &query.matchers {
84 if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85 return Some(matcher.value.clone());
86 }
87 }
88 }
89
90 for query in &request.queries {
92 for matcher in &query.matchers {
93 if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94 return Some(matcher.value.clone());
95 }
96 }
97 }
98
99 None
100}
101
102#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105 let DataFrame::DataFusion(dataframe) = dataframe;
106
107 let start_timestamp_ms = q.start_timestamp_ms;
108 let end_timestamp_ms = q.end_timestamp_ms;
109
110 let label_matches = &q.matchers;
111
112 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
113
114 conditions.push(col(GREPTIME_TIMESTAMP).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
115 conditions.push(col(GREPTIME_TIMESTAMP).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
116
117 for m in label_matches {
118 let name = &m.name;
119
120 if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
121 continue;
122 }
123
124 let value = &m.value;
125 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
126 error::InvalidPromRemoteRequestSnafu {
127 msg: format!("invalid LabelMatcher type, decode error: {e}",),
128 }
129 .build()
130 })?;
131
132 match m_type {
133 MatcherType::Eq => {
134 conditions.push(col(name).eq(lit(value)));
135 }
136 MatcherType::Neq => {
137 conditions.push(col(name).not_eq(lit(value)));
138 }
139 MatcherType::Re => {
141 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
142 }
143 MatcherType::Nre => {
145 conditions.push(regexp_match(col(name), lit(value), None).is_null());
146 }
147 }
148 }
149
150 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
152
153 let dataframe = dataframe
154 .filter(conditions)
155 .context(error::DataFrameSnafu)?;
156
157 Ok(dataframe.into_parts().1)
158}
159
160#[inline]
161fn new_label(name: String, value: String) -> Label {
162 Label { name, value }
163}
164
165fn lit_timestamp_millisecond(ts: i64) -> Expr {
166 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
167}
168
169#[derive(Debug)]
171struct TimeSeriesId {
172 labels: Vec<Label>,
173}
174
175impl PartialEq for TimeSeriesId {
177 fn eq(&self, other: &Self) -> bool {
178 if self.labels.len() != other.labels.len() {
179 return false;
180 }
181
182 self.labels
183 .iter()
184 .zip(other.labels.iter())
185 .all(|(l, r)| l.name == r.name && l.value == r.value)
186 }
187}
188impl Eq for TimeSeriesId {}
189
190impl Hash for TimeSeriesId {
191 fn hash<H: Hasher>(&self, state: &mut H) {
192 for label in &self.labels {
193 label.name.hash(state);
194 label.value.hash(state);
195 }
196 }
197}
198
199impl Ord for TimeSeriesId {
201 fn cmp(&self, other: &Self) -> Ordering {
202 let ordering = self.labels.len().cmp(&other.labels.len());
203 if ordering != Ordering::Equal {
204 return ordering;
205 }
206
207 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
208 let ordering = l.name.cmp(&r.name);
209
210 if ordering != Ordering::Equal {
211 return ordering;
212 }
213
214 let ordering = l.value.cmp(&r.value);
215
216 if ordering != Ordering::Equal {
217 return ordering;
218 }
219 }
220 Ordering::Equal
221 }
222}
223
224impl PartialOrd for TimeSeriesId {
225 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
226 Some(self.cmp(other))
227 }
228}
229
230fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
233 let row_count = recordbatch.num_rows();
234 let mut timeseries_ids = Vec::with_capacity(row_count);
235
236 for row in 0..row_count {
237 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
238 labels.push(new_label(
239 METRIC_NAME_LABEL.to_string(),
240 table_name.to_string(),
241 ));
242
243 for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
244 if column_schema.name == GREPTIME_VALUE || column_schema.name == GREPTIME_TIMESTAMP {
245 continue;
246 }
247
248 let column = &recordbatch.columns()[i];
249 if column.is_null(row) {
251 continue;
252 }
253
254 let value = column.get(row).to_string();
255 labels.push(new_label(column_schema.name.clone(), value));
256 }
257 timeseries_ids.push(TimeSeriesId { labels });
258 }
259 timeseries_ids
260}
261
262pub fn recordbatches_to_timeseries(
263 table_name: &str,
264 recordbatches: RecordBatches,
265) -> Result<Vec<TimeSeries>> {
266 Ok(recordbatches
267 .take()
268 .into_iter()
269 .map(|x| recordbatch_to_timeseries(table_name, x))
270 .collect::<Result<Vec<_>>>()?
271 .into_iter()
272 .flatten()
273 .collect())
274}
275
276fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
277 let ts_column = recordbatch.column_by_name(GREPTIME_TIMESTAMP).context(
278 error::InvalidPromRemoteReadQueryResultSnafu {
279 msg: "missing greptime_timestamp column in query result",
280 },
281 )?;
282 ensure!(
283 ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
284 error::InvalidPromRemoteReadQueryResultSnafu {
285 msg: format!(
286 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
287 ts_column.data_type()
288 )
289 }
290 );
291
292 let field_column = recordbatch.column_by_name(GREPTIME_VALUE).context(
293 error::InvalidPromRemoteReadQueryResultSnafu {
294 msg: "missing greptime_value column in query result",
295 },
296 )?;
297 ensure!(
298 field_column.data_type() == ConcreteDataType::float64_datatype(),
299 error::InvalidPromRemoteReadQueryResultSnafu {
300 msg: format!(
301 "Expect value column of datatype Float64, actual {:?}",
302 field_column.data_type()
303 )
304 }
305 );
306
307 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
309 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
311
312 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
313 let timeseries = timeseries_map
314 .entry(timeseries_id)
315 .or_insert_with(|| TimeSeries {
316 labels: timeseries_id.labels.clone(),
317 ..Default::default()
318 });
319
320 if ts_column.is_null(row) || field_column.is_null(row) {
321 continue;
322 }
323
324 let value: f64 = match field_column.get(row) {
325 Value::Float64(value) => value.into(),
326 _ => unreachable!("checked by the \"ensure\" above"),
327 };
328 let timestamp = match ts_column.get(row) {
329 Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
330 _ => unreachable!("checked by the \"ensure\" above"),
331 };
332 let sample = Sample { value, timestamp };
333
334 timeseries.samples.push(sample);
335 }
336
337 Ok(timeseries_map.into_values().collect())
338}
339
340pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
341 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
342
343 let mut multi_table_data = MultiTableData::new();
344
345 for series in &request.timeseries {
346 let table_name = &series
347 .labels
348 .iter()
349 .find(|label| {
350 label.name == METRIC_NAME_LABEL
352 })
353 .context(error::InvalidPromRemoteRequestSnafu {
354 msg: "missing '__name__' label in time-series",
355 })?
356 .value;
357
358 let num_columns = series.labels.len() + 1;
361
362 let table_data = multi_table_data.get_or_default_table_data(
363 table_name,
364 num_columns,
365 series.samples.len(),
366 );
367
368 let kvs = series.labels.iter().filter_map(|label| {
370 if label.name == METRIC_NAME_LABEL {
371 None
372 } else {
373 Some((label.name.clone(), label.value.clone()))
374 }
375 });
376
377 if series.samples.len() == 1 {
378 let mut one_row = table_data.alloc_one_row();
379
380 row_writer::write_tags(table_data, kvs, &mut one_row)?;
381 row_writer::write_f64(
383 table_data,
384 GREPTIME_VALUE,
385 series.samples[0].value,
386 &mut one_row,
387 )?;
388 row_writer::write_ts_to_millis(
390 table_data,
391 GREPTIME_TIMESTAMP,
392 Some(series.samples[0].timestamp),
393 Precision::Millisecond,
394 &mut one_row,
395 )?;
396
397 table_data.add_row(one_row);
398 } else {
399 for Sample { value, timestamp } in &series.samples {
400 let mut one_row = table_data.alloc_one_row();
401
402 let kvs = kvs.clone();
404 row_writer::write_tags(table_data, kvs, &mut one_row)?;
405 row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
407 row_writer::write_ts_to_millis(
409 table_data,
410 GREPTIME_TIMESTAMP,
411 Some(*timestamp),
412 Precision::Millisecond,
413 &mut one_row,
414 )?;
415
416 table_data.add_row(one_row);
417 }
418 }
419 }
420
421 Ok(multi_table_data.into_row_insert_requests())
422}
423
424#[inline]
425pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
426 let mut decoder = Decoder::new();
427 decoder
428 .decompress_vec(buf)
429 .context(error::DecompressSnappyPromRemoteRequestSnafu)
430}
431
432#[inline]
433pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
434 let mut encoder = Encoder::new();
435 encoder
436 .compress_vec(buf)
437 .context(error::CompressPromRemoteRequestSnafu)
438}
439
440#[inline]
441pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
442 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
443}
444
445pub fn mock_timeseries() -> Vec<TimeSeries> {
448 vec![
449 TimeSeries {
450 labels: vec![
451 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
452 new_label("job".to_string(), "spark".to_string()),
453 ],
454 samples: vec![
455 Sample {
456 value: 1.0f64,
457 timestamp: 1000,
458 },
459 Sample {
460 value: 2.0f64,
461 timestamp: 2000,
462 },
463 ],
464 ..Default::default()
465 },
466 TimeSeries {
467 labels: vec![
468 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
469 new_label("instance".to_string(), "test_host1".to_string()),
470 new_label("idc".to_string(), "z001".to_string()),
471 ],
472 samples: vec![
473 Sample {
474 value: 3.0f64,
475 timestamp: 1000,
476 },
477 Sample {
478 value: 4.0f64,
479 timestamp: 2000,
480 },
481 ],
482 ..Default::default()
483 },
484 TimeSeries {
485 labels: vec![
486 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
487 new_label("idc".to_string(), "z002".to_string()),
488 new_label("app".to_string(), "biz".to_string()),
489 ],
490 samples: vec![
491 Sample {
492 value: 5.0f64,
493 timestamp: 1000,
494 },
495 Sample {
496 value: 6.0f64,
497 timestamp: 2000,
498 },
499 Sample {
500 value: 7.0f64,
501 timestamp: 3000,
502 },
503 ],
504 ..Default::default()
505 },
506 ]
507}
508
509pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
511 let ts_demo_metrics = TimeSeries {
512 labels: vec![
513 new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
514 new_label("idc".to_string(), "idc3".to_string()),
515 new_label("new_label1".to_string(), "foo".to_string()),
516 ],
517 samples: vec![Sample {
518 value: 42.0,
519 timestamp: 3000,
520 }],
521 ..Default::default()
522 };
523 let ts_multi_labels = TimeSeries {
524 labels: vec![
525 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
526 new_label("idc".to_string(), "idc4".to_string()),
527 new_label("env".to_string(), "prod".to_string()),
528 new_label("host".to_string(), "host9".to_string()),
529 new_label("new_label2".to_string(), "bar".to_string()),
530 ],
531 samples: vec![Sample {
532 value: 99.0,
533 timestamp: 4000,
534 }],
535 ..Default::default()
536 };
537
538 vec![ts_demo_metrics, ts_multi_labels]
539}
540
541pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
543 let idc3_schema = TimeSeries {
544 labels: vec![
545 new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
546 new_label("__database__".to_string(), "idc3".to_string()),
547 new_label("__physical_table__".to_string(), "f1".to_string()),
548 ],
549 samples: vec![Sample {
550 value: 42.0,
551 timestamp: 3000,
552 }],
553 ..Default::default()
554 };
555 let idc4_schema = TimeSeries {
556 labels: vec![
557 new_label(
558 METRIC_NAME_LABEL.to_string(),
559 "idc4_local_table".to_string(),
560 ),
561 new_label("__database__".to_string(), "idc4".to_string()),
562 new_label("__physical_table__".to_string(), "f2".to_string()),
563 ],
564 samples: vec![Sample {
565 value: 99.0,
566 timestamp: 4000,
567 }],
568 ..Default::default()
569 };
570
571 vec![idc3_schema, idc4_schema]
572}
573
574#[cfg(test)]
575mod tests {
576 use std::sync::Arc;
577
578 use api::prom_store::remote::LabelMatcher;
579 use api::v1::{ColumnDataType, Row, SemanticType};
580 use datafusion::prelude::SessionContext;
581 use datatypes::schema::{ColumnSchema, Schema};
582 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
583 use table::table::adapter::DfTableProviderAdapter;
584 use table::test_util::MemTable;
585
586 use super::*;
587
588 const EQ_TYPE: i32 = MatcherType::Eq as i32;
589 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
590 const RE_TYPE: i32 = MatcherType::Re as i32;
591
592 #[test]
593 fn test_table_name() {
594 let q = Query {
595 start_timestamp_ms: 1000,
596 end_timestamp_ms: 2000,
597 matchers: vec![],
598 ..Default::default()
599 };
600 let err = table_name(&q).unwrap_err();
601 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
602
603 let q = Query {
604 start_timestamp_ms: 1000,
605 end_timestamp_ms: 2000,
606 matchers: vec![LabelMatcher {
607 name: METRIC_NAME_LABEL.to_string(),
608 value: "test".to_string(),
609 r#type: EQ_TYPE,
610 }],
611 ..Default::default()
612 };
613 assert_eq!("test", table_name(&q).unwrap());
614 }
615
616 #[test]
617 fn test_query_to_plan() {
618 let q = Query {
619 start_timestamp_ms: 1000,
620 end_timestamp_ms: 2000,
621 matchers: vec![LabelMatcher {
622 name: METRIC_NAME_LABEL.to_string(),
623 value: "test".to_string(),
624 r#type: EQ_TYPE,
625 }],
626 ..Default::default()
627 };
628
629 let schema = Arc::new(Schema::new(vec![
630 ColumnSchema::new(
631 GREPTIME_TIMESTAMP,
632 ConcreteDataType::timestamp_millisecond_datatype(),
633 true,
634 ),
635 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
636 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
637 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
638 ]));
639 let recordbatch = RecordBatch::new(
640 schema,
641 vec![
642 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
643 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
644 Arc::new(StringVector::from(vec!["host1"])) as _,
645 Arc::new(StringVector::from(vec!["job"])) as _,
646 ],
647 )
648 .unwrap();
649
650 let ctx = SessionContext::new();
651 let table = MemTable::table("test", recordbatch);
652 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
653
654 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
655 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
656 let display_string = format!("{}", plan.display_indent());
657
658 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n TableScan: ?table?", display_string);
659
660 let q = Query {
661 start_timestamp_ms: 1000,
662 end_timestamp_ms: 2000,
663 matchers: vec![
664 LabelMatcher {
665 name: METRIC_NAME_LABEL.to_string(),
666 value: "test".to_string(),
667 r#type: EQ_TYPE,
668 },
669 LabelMatcher {
670 name: "job".to_string(),
671 value: "*prom*".to_string(),
672 r#type: RE_TYPE,
673 },
674 LabelMatcher {
675 name: "instance".to_string(),
676 value: "localhost".to_string(),
677 r#type: NEQ_TYPE,
678 },
679 ],
680 ..Default::default()
681 };
682
683 let dataframe = ctx.read_table(table_provider).unwrap();
684 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
685 let display_string = format!("{}", plan.display_indent());
686
687 assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
688 }
689
690 fn column_schemas_with(
691 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
692 ) -> Vec<api::v1::ColumnSchema> {
693 kts_iter.push((
694 "greptime_value",
695 ColumnDataType::Float64,
696 SemanticType::Field,
697 ));
698 kts_iter.push((
699 "greptime_timestamp",
700 ColumnDataType::TimestampMillisecond,
701 SemanticType::Timestamp,
702 ));
703
704 kts_iter
705 .into_iter()
706 .map(|(k, t, s)| api::v1::ColumnSchema {
707 column_name: k.to_string(),
708 datatype: t as i32,
709 semantic_type: s as i32,
710 ..Default::default()
711 })
712 .collect()
713 }
714
715 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
716 Row {
717 values: vec![
718 api::v1::Value {
719 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
720 },
721 api::v1::Value {
722 value_data: Some(api::v1::value::ValueData::F64Value(value)),
723 },
724 api::v1::Value {
725 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
726 timestamp,
727 )),
728 },
729 ],
730 }
731 }
732
733 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
734 Row {
735 values: vec![
736 api::v1::Value {
737 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
738 },
739 api::v1::Value {
740 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
741 },
742 api::v1::Value {
743 value_data: Some(api::v1::value::ValueData::F64Value(value)),
744 },
745 api::v1::Value {
746 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
747 timestamp,
748 )),
749 },
750 ],
751 }
752 }
753
754 #[test]
755 fn test_write_request_to_row_insert_exprs() {
756 let write_request = WriteRequest {
757 timeseries: mock_timeseries(),
758 ..Default::default()
759 };
760
761 let mut exprs = to_grpc_row_insert_requests(&write_request)
762 .unwrap()
763 .0
764 .inserts;
765 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
766 assert_eq!(3, exprs.len());
767 assert_eq!("metric1", exprs[0].table_name);
768 assert_eq!("metric2", exprs[1].table_name);
769 assert_eq!("metric3", exprs[2].table_name);
770
771 let rows = exprs[0].rows.as_ref().unwrap();
772 let schema = &rows.schema;
773 let rows = &rows.rows;
774 assert_eq!(2, rows.len());
775 assert_eq!(3, schema.len());
776 assert_eq!(
777 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
778 *schema
779 );
780 assert_eq!(
781 &vec![
782 make_row_with_label("spark", 1.0, 1000),
783 make_row_with_label("spark", 2.0, 2000),
784 ],
785 rows
786 );
787
788 let rows = exprs[1].rows.as_ref().unwrap();
789 let schema = &rows.schema;
790 let rows = &rows.rows;
791 assert_eq!(2, rows.len());
792 assert_eq!(4, schema.len());
793 assert_eq!(
794 column_schemas_with(vec![
795 ("instance", ColumnDataType::String, SemanticType::Tag),
796 ("idc", ColumnDataType::String, SemanticType::Tag)
797 ]),
798 *schema
799 );
800 assert_eq!(
801 &vec![
802 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
803 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
804 ],
805 rows
806 );
807
808 let rows = exprs[2].rows.as_ref().unwrap();
809 let schema = &rows.schema;
810 let rows = &rows.rows;
811 assert_eq!(3, rows.len());
812 assert_eq!(4, schema.len());
813 assert_eq!(
814 column_schemas_with(vec![
815 ("idc", ColumnDataType::String, SemanticType::Tag),
816 ("app", ColumnDataType::String, SemanticType::Tag)
817 ]),
818 *schema
819 );
820 assert_eq!(
821 &vec![
822 make_row_with_2_labels("z002", "biz", 5.0, 1000),
823 make_row_with_2_labels("z002", "biz", 6.0, 2000),
824 make_row_with_2_labels("z002", "biz", 7.0, 3000),
825 ],
826 rows
827 );
828 }
829
830 #[test]
831 fn test_recordbatches_to_timeseries() {
832 let schema = Arc::new(Schema::new(vec![
833 ColumnSchema::new(
834 GREPTIME_TIMESTAMP,
835 ConcreteDataType::timestamp_millisecond_datatype(),
836 true,
837 ),
838 ColumnSchema::new(GREPTIME_VALUE, ConcreteDataType::float64_datatype(), true),
839 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
840 ]));
841
842 let recordbatches = RecordBatches::try_new(
843 schema.clone(),
844 vec![
845 RecordBatch::new(
846 schema.clone(),
847 vec![
848 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
849 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
850 Arc::new(StringVector::from(vec!["host1"])) as _,
851 ],
852 )
853 .unwrap(),
854 RecordBatch::new(
855 schema,
856 vec![
857 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
858 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
859 Arc::new(StringVector::from(vec!["host2"])) as _,
860 ],
861 )
862 .unwrap(),
863 ],
864 )
865 .unwrap();
866
867 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
868 assert_eq!(2, timeseries.len());
869
870 assert_eq!(
871 vec![
872 Label {
873 name: METRIC_NAME_LABEL.to_string(),
874 value: "metric1".to_string(),
875 },
876 Label {
877 name: "instance".to_string(),
878 value: "host1".to_string(),
879 },
880 ],
881 timeseries[0].labels
882 );
883
884 assert_eq!(
885 timeseries[0].samples,
886 vec![Sample {
887 value: 3.0,
888 timestamp: 1000,
889 }]
890 );
891
892 assert_eq!(
893 vec![
894 Label {
895 name: METRIC_NAME_LABEL.to_string(),
896 value: "metric1".to_string(),
897 },
898 Label {
899 name: "instance".to_string(),
900 value: "host2".to_string(),
901 },
902 ],
903 timeseries[1].labels
904 );
905 assert_eq!(
906 timeseries[1].samples,
907 vec![Sample {
908 value: 7.0,
909 timestamp: 2000,
910 }]
911 );
912 }
913}