1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use arrow::array::{Array, AsArray};
25use arrow::datatypes::{Float64Type, TimestampMillisecondType};
26use common_grpc::precision::Precision;
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_recordbatch::{RecordBatch, RecordBatches};
29use common_telemetry::tracing;
30use datafusion::prelude::{Expr, col, lit, regexp_match};
31use datafusion_common::ScalarValue;
32use datafusion_expr::LogicalPlan;
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53pub const FIELD_NAME_LABEL: &str = "__field__";
55
56pub struct Metrics {
58 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61pub fn table_name(q: &Query) -> Result<String> {
63 let label_matches = &q.matchers;
64
65 label_matches
66 .iter()
67 .find_map(|m| {
68 if m.name == METRIC_NAME_LABEL {
69 Some(m.value.clone())
70 } else {
71 None
72 }
73 })
74 .context(error::InvalidPromRemoteRequestSnafu {
75 msg: "missing '__name__' label in timeseries",
76 })
77}
78
79pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82 for query in &request.queries {
83 for matcher in &query.matchers {
84 if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85 return Some(matcher.value.clone());
86 }
87 }
88 }
89
90 for query in &request.queries {
92 for matcher in &query.matchers {
93 if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94 return Some(matcher.value.clone());
95 }
96 }
97 }
98
99 None
100}
101
102#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105 let DataFrame::DataFusion(dataframe) = dataframe;
106
107 let start_timestamp_ms = q.start_timestamp_ms;
108 let end_timestamp_ms = q.end_timestamp_ms;
109
110 let label_matches = &q.matchers;
111
112 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
113
114 conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
115 conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
116
117 for m in label_matches {
118 let name = &m.name;
119
120 if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
121 continue;
122 }
123
124 let value = &m.value;
125 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
126 error::InvalidPromRemoteRequestSnafu {
127 msg: format!("invalid LabelMatcher type, decode error: {e}",),
128 }
129 .build()
130 })?;
131
132 match m_type {
133 MatcherType::Eq => {
134 conditions.push(col(name).eq(lit(value)));
135 }
136 MatcherType::Neq => {
137 conditions.push(col(name).not_eq(lit(value)));
138 }
139 MatcherType::Re => {
141 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
142 }
143 MatcherType::Nre => {
145 conditions.push(regexp_match(col(name), lit(value), None).is_null());
146 }
147 }
148 }
149
150 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
152
153 let dataframe = dataframe
154 .filter(conditions)
155 .context(error::DataFrameSnafu)?;
156
157 Ok(dataframe.into_parts().1)
158}
159
160#[inline]
161fn new_label(name: String, value: String) -> Label {
162 Label { name, value }
163}
164
165fn lit_timestamp_millisecond(ts: i64) -> Expr {
166 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
167}
168
169#[derive(Debug)]
171struct TimeSeriesId {
172 labels: Vec<Label>,
173}
174
175impl PartialEq for TimeSeriesId {
177 fn eq(&self, other: &Self) -> bool {
178 if self.labels.len() != other.labels.len() {
179 return false;
180 }
181
182 self.labels
183 .iter()
184 .zip(other.labels.iter())
185 .all(|(l, r)| l.name == r.name && l.value == r.value)
186 }
187}
188impl Eq for TimeSeriesId {}
189
190impl Hash for TimeSeriesId {
191 fn hash<H: Hasher>(&self, state: &mut H) {
192 for label in &self.labels {
193 label.name.hash(state);
194 label.value.hash(state);
195 }
196 }
197}
198
199impl Ord for TimeSeriesId {
201 fn cmp(&self, other: &Self) -> Ordering {
202 let ordering = self.labels.len().cmp(&other.labels.len());
203 if ordering != Ordering::Equal {
204 return ordering;
205 }
206
207 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
208 let ordering = l.name.cmp(&r.name);
209
210 if ordering != Ordering::Equal {
211 return ordering;
212 }
213
214 let ordering = l.value.cmp(&r.value);
215
216 if ordering != Ordering::Equal {
217 return ordering;
218 }
219 }
220 Ordering::Equal
221 }
222}
223
224impl PartialOrd for TimeSeriesId {
225 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
226 Some(self.cmp(other))
227 }
228}
229
230fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
233 let row_count = recordbatch.num_rows();
234 let mut timeseries_ids = Vec::with_capacity(row_count);
235
236 let column_names = recordbatch
237 .schema
238 .column_schemas()
239 .iter()
240 .map(|column_schema| &column_schema.name);
241 let columns = column_names
242 .enumerate()
243 .filter(|(_, column_name)| {
244 *column_name != greptime_timestamp() && *column_name != greptime_value()
245 })
246 .map(|(i, column_name)| {
247 (
248 column_name,
249 recordbatch.iter_column_as_string(i).collect::<Vec<_>>(),
250 )
251 })
252 .collect::<Vec<_>>();
253
254 for row in 0..row_count {
255 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
256 labels.push(new_label(
257 METRIC_NAME_LABEL.to_string(),
258 table_name.to_string(),
259 ));
260
261 for (column_name, column_values) in columns.iter() {
262 if let Some(value) = &column_values[row] {
263 labels.push(new_label((*column_name).clone(), value.clone()));
264 }
265 }
266 timeseries_ids.push(TimeSeriesId { labels });
267 }
268 timeseries_ids
269}
270
271pub fn recordbatches_to_timeseries(
272 table_name: &str,
273 recordbatches: RecordBatches,
274) -> Result<Vec<TimeSeries>> {
275 Ok(recordbatches
276 .take()
277 .into_iter()
278 .map(|x| recordbatch_to_timeseries(table_name, x))
279 .collect::<Result<Vec<_>>>()?
280 .into_iter()
281 .flatten()
282 .collect())
283}
284
285fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
286 let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
287 error::InvalidPromRemoteReadQueryResultSnafu {
288 msg: "missing greptime_timestamp column in query result",
289 },
290 )?;
291 let ts_column = ts_column
292 .as_primitive_opt::<TimestampMillisecondType>()
293 .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
294 msg: format!(
295 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
296 ts_column.data_type()
297 ),
298 })?;
299
300 let field_column = recordbatch.column_by_name(greptime_value()).context(
301 error::InvalidPromRemoteReadQueryResultSnafu {
302 msg: "missing greptime_value column in query result",
303 },
304 )?;
305 let field_column = field_column
306 .as_primitive_opt::<Float64Type>()
307 .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
308 msg: format!(
309 "Expect value column of datatype Float64, actual {:?}",
310 field_column.data_type()
311 ),
312 })?;
313
314 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
316 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
318
319 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
320 let timeseries = timeseries_map
321 .entry(timeseries_id)
322 .or_insert_with(|| TimeSeries {
323 labels: timeseries_id.labels.clone(),
324 ..Default::default()
325 });
326
327 if ts_column.is_null(row) || field_column.is_null(row) {
328 continue;
329 }
330
331 let value = field_column.value(row);
332 let timestamp = ts_column.value(row);
333 let sample = Sample { value, timestamp };
334
335 timeseries.samples.push(sample);
336 }
337
338 Ok(timeseries_map.into_values().collect())
339}
340
341pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
342 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
343
344 let mut multi_table_data = MultiTableData::new();
345
346 for series in &request.timeseries {
347 let table_name = &series
348 .labels
349 .iter()
350 .find(|label| {
351 label.name == METRIC_NAME_LABEL
353 })
354 .context(error::InvalidPromRemoteRequestSnafu {
355 msg: "missing '__name__' label in time-series",
356 })?
357 .value;
358
359 let num_columns = series.labels.len() + 1;
362
363 let table_data = multi_table_data.get_or_default_table_data(
364 table_name,
365 num_columns,
366 series.samples.len(),
367 );
368
369 let kvs = series.labels.iter().filter_map(|label| {
371 if label.name == METRIC_NAME_LABEL {
372 None
373 } else {
374 Some((label.name.clone(), label.value.clone()))
375 }
376 });
377
378 if series.samples.len() == 1 {
379 let mut one_row = table_data.alloc_one_row();
380
381 row_writer::write_tags(table_data, kvs, &mut one_row)?;
382 row_writer::write_f64(
384 table_data,
385 greptime_value(),
386 series.samples[0].value,
387 &mut one_row,
388 )?;
389 row_writer::write_ts_to_millis(
391 table_data,
392 greptime_timestamp(),
393 Some(series.samples[0].timestamp),
394 Precision::Millisecond,
395 &mut one_row,
396 )?;
397
398 table_data.add_row(one_row);
399 } else {
400 for Sample { value, timestamp } in &series.samples {
401 let mut one_row = table_data.alloc_one_row();
402
403 let kvs = kvs.clone();
405 row_writer::write_tags(table_data, kvs, &mut one_row)?;
406 row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
408 row_writer::write_ts_to_millis(
410 table_data,
411 greptime_timestamp(),
412 Some(*timestamp),
413 Precision::Millisecond,
414 &mut one_row,
415 )?;
416
417 table_data.add_row(one_row);
418 }
419 }
420 }
421
422 Ok(multi_table_data.into_row_insert_requests())
423}
424
425#[inline]
426pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
427 let mut decoder = Decoder::new();
428 decoder
429 .decompress_vec(buf)
430 .context(error::DecompressSnappyPromRemoteRequestSnafu)
431}
432
433#[inline]
434pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
435 let mut encoder = Encoder::new();
436 encoder
437 .compress_vec(buf)
438 .context(error::CompressPromRemoteRequestSnafu)
439}
440
441#[inline]
442pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
443 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
444}
445
446pub fn mock_timeseries() -> Vec<TimeSeries> {
449 vec![
450 TimeSeries {
451 labels: vec![
452 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
453 new_label("job".to_string(), "spark".to_string()),
454 ],
455 samples: vec![
456 Sample {
457 value: 1.0f64,
458 timestamp: 1000,
459 },
460 Sample {
461 value: 2.0f64,
462 timestamp: 2000,
463 },
464 ],
465 ..Default::default()
466 },
467 TimeSeries {
468 labels: vec![
469 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
470 new_label("instance".to_string(), "test_host1".to_string()),
471 new_label("idc".to_string(), "z001".to_string()),
472 ],
473 samples: vec![
474 Sample {
475 value: 3.0f64,
476 timestamp: 1000,
477 },
478 Sample {
479 value: 4.0f64,
480 timestamp: 2000,
481 },
482 ],
483 ..Default::default()
484 },
485 TimeSeries {
486 labels: vec![
487 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
488 new_label("idc".to_string(), "z002".to_string()),
489 new_label("app".to_string(), "biz".to_string()),
490 ],
491 samples: vec![
492 Sample {
493 value: 5.0f64,
494 timestamp: 1000,
495 },
496 Sample {
497 value: 6.0f64,
498 timestamp: 2000,
499 },
500 Sample {
501 value: 7.0f64,
502 timestamp: 3000,
503 },
504 ],
505 ..Default::default()
506 },
507 ]
508}
509
510pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
512 let ts_demo_metrics = TimeSeries {
513 labels: vec![
514 new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
515 new_label("idc".to_string(), "idc3".to_string()),
516 new_label("new_label1".to_string(), "foo".to_string()),
517 ],
518 samples: vec![Sample {
519 value: 42.0,
520 timestamp: 3000,
521 }],
522 ..Default::default()
523 };
524 let ts_multi_labels = TimeSeries {
525 labels: vec![
526 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
527 new_label("idc".to_string(), "idc4".to_string()),
528 new_label("env".to_string(), "prod".to_string()),
529 new_label("host".to_string(), "host9".to_string()),
530 new_label("new_label2".to_string(), "bar".to_string()),
531 ],
532 samples: vec![Sample {
533 value: 99.0,
534 timestamp: 4000,
535 }],
536 ..Default::default()
537 };
538
539 vec![ts_demo_metrics, ts_multi_labels]
540}
541
542pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
544 let idc3_schema = TimeSeries {
545 labels: vec![
546 new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
547 new_label("__database__".to_string(), "idc3".to_string()),
548 new_label("__physical_table__".to_string(), "f1".to_string()),
549 ],
550 samples: vec![Sample {
551 value: 42.0,
552 timestamp: 3000,
553 }],
554 ..Default::default()
555 };
556 let idc4_schema = TimeSeries {
557 labels: vec![
558 new_label(
559 METRIC_NAME_LABEL.to_string(),
560 "idc4_local_table".to_string(),
561 ),
562 new_label("__database__".to_string(), "idc4".to_string()),
563 new_label("__physical_table__".to_string(), "f2".to_string()),
564 ],
565 samples: vec![Sample {
566 value: 99.0,
567 timestamp: 4000,
568 }],
569 ..Default::default()
570 };
571
572 vec![idc3_schema, idc4_schema]
573}
574
575#[cfg(test)]
576mod tests {
577 use std::sync::Arc;
578
579 use api::prom_store::remote::LabelMatcher;
580 use api::v1::{ColumnDataType, Row, SemanticType};
581 use datafusion::prelude::SessionContext;
582 use datatypes::data_type::ConcreteDataType;
583 use datatypes::schema::{ColumnSchema, Schema};
584 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
585 use table::table::adapter::DfTableProviderAdapter;
586 use table::test_util::MemTable;
587
588 use super::*;
589
590 const EQ_TYPE: i32 = MatcherType::Eq as i32;
591 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
592 const RE_TYPE: i32 = MatcherType::Re as i32;
593
594 #[test]
595 fn test_table_name() {
596 let q = Query {
597 start_timestamp_ms: 1000,
598 end_timestamp_ms: 2000,
599 matchers: vec![],
600 ..Default::default()
601 };
602 let err = table_name(&q).unwrap_err();
603 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
604
605 let q = Query {
606 start_timestamp_ms: 1000,
607 end_timestamp_ms: 2000,
608 matchers: vec![LabelMatcher {
609 name: METRIC_NAME_LABEL.to_string(),
610 value: "test".to_string(),
611 r#type: EQ_TYPE,
612 }],
613 ..Default::default()
614 };
615 assert_eq!("test", table_name(&q).unwrap());
616 }
617
618 #[test]
619 fn test_query_to_plan() {
620 let q = Query {
621 start_timestamp_ms: 1000,
622 end_timestamp_ms: 2000,
623 matchers: vec![LabelMatcher {
624 name: METRIC_NAME_LABEL.to_string(),
625 value: "test".to_string(),
626 r#type: EQ_TYPE,
627 }],
628 ..Default::default()
629 };
630
631 let schema = Arc::new(Schema::new(vec![
632 ColumnSchema::new(
633 greptime_timestamp(),
634 ConcreteDataType::timestamp_millisecond_datatype(),
635 true,
636 ),
637 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
638 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
639 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
640 ]));
641 let recordbatch = RecordBatch::new(
642 schema,
643 vec![
644 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
645 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
646 Arc::new(StringVector::from(vec!["host1"])) as _,
647 Arc::new(StringVector::from(vec!["job"])) as _,
648 ],
649 )
650 .unwrap();
651
652 let ctx = SessionContext::new();
653 let table = MemTable::table("test", recordbatch);
654 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
655
656 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
657 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
658 let display_string = format!("{}", plan.display_indent());
659
660 let ts_col = greptime_timestamp();
661 let expected = format!(
662 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n TableScan: ?table?",
663 ts_col, ts_col
664 );
665 assert_eq!(expected, display_string);
666
667 let q = Query {
668 start_timestamp_ms: 1000,
669 end_timestamp_ms: 2000,
670 matchers: vec![
671 LabelMatcher {
672 name: METRIC_NAME_LABEL.to_string(),
673 value: "test".to_string(),
674 r#type: EQ_TYPE,
675 },
676 LabelMatcher {
677 name: "job".to_string(),
678 value: "*prom*".to_string(),
679 r#type: RE_TYPE,
680 },
681 LabelMatcher {
682 name: "instance".to_string(),
683 value: "localhost".to_string(),
684 r#type: NEQ_TYPE,
685 },
686 ],
687 ..Default::default()
688 };
689
690 let dataframe = ctx.read_table(table_provider).unwrap();
691 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
692 let display_string = format!("{}", plan.display_indent());
693
694 let ts_col = greptime_timestamp();
695 let expected = format!(
696 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?",
697 ts_col, ts_col
698 );
699 assert_eq!(expected, display_string);
700 }
701
702 fn column_schemas_with(
703 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
704 ) -> Vec<api::v1::ColumnSchema> {
705 kts_iter.push((
706 greptime_value(),
707 ColumnDataType::Float64,
708 SemanticType::Field,
709 ));
710 kts_iter.push((
711 greptime_timestamp(),
712 ColumnDataType::TimestampMillisecond,
713 SemanticType::Timestamp,
714 ));
715
716 kts_iter
717 .into_iter()
718 .map(|(k, t, s)| api::v1::ColumnSchema {
719 column_name: k.to_string(),
720 datatype: t as i32,
721 semantic_type: s as i32,
722 ..Default::default()
723 })
724 .collect()
725 }
726
727 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
728 Row {
729 values: vec![
730 api::v1::Value {
731 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
732 },
733 api::v1::Value {
734 value_data: Some(api::v1::value::ValueData::F64Value(value)),
735 },
736 api::v1::Value {
737 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
738 timestamp,
739 )),
740 },
741 ],
742 }
743 }
744
745 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
746 Row {
747 values: vec![
748 api::v1::Value {
749 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
750 },
751 api::v1::Value {
752 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
753 },
754 api::v1::Value {
755 value_data: Some(api::v1::value::ValueData::F64Value(value)),
756 },
757 api::v1::Value {
758 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
759 timestamp,
760 )),
761 },
762 ],
763 }
764 }
765
766 #[test]
767 fn test_write_request_to_row_insert_exprs() {
768 let write_request = WriteRequest {
769 timeseries: mock_timeseries(),
770 ..Default::default()
771 };
772
773 let mut exprs = to_grpc_row_insert_requests(&write_request)
774 .unwrap()
775 .0
776 .inserts;
777 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
778 assert_eq!(3, exprs.len());
779 assert_eq!("metric1", exprs[0].table_name);
780 assert_eq!("metric2", exprs[1].table_name);
781 assert_eq!("metric3", exprs[2].table_name);
782
783 let rows = exprs[0].rows.as_ref().unwrap();
784 let schema = &rows.schema;
785 let rows = &rows.rows;
786 assert_eq!(2, rows.len());
787 assert_eq!(3, schema.len());
788 assert_eq!(
789 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
790 *schema
791 );
792 assert_eq!(
793 &vec![
794 make_row_with_label("spark", 1.0, 1000),
795 make_row_with_label("spark", 2.0, 2000),
796 ],
797 rows
798 );
799
800 let rows = exprs[1].rows.as_ref().unwrap();
801 let schema = &rows.schema;
802 let rows = &rows.rows;
803 assert_eq!(2, rows.len());
804 assert_eq!(4, schema.len());
805 assert_eq!(
806 column_schemas_with(vec![
807 ("instance", ColumnDataType::String, SemanticType::Tag),
808 ("idc", ColumnDataType::String, SemanticType::Tag)
809 ]),
810 *schema
811 );
812 assert_eq!(
813 &vec![
814 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
815 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
816 ],
817 rows
818 );
819
820 let rows = exprs[2].rows.as_ref().unwrap();
821 let schema = &rows.schema;
822 let rows = &rows.rows;
823 assert_eq!(3, rows.len());
824 assert_eq!(4, schema.len());
825 assert_eq!(
826 column_schemas_with(vec![
827 ("idc", ColumnDataType::String, SemanticType::Tag),
828 ("app", ColumnDataType::String, SemanticType::Tag)
829 ]),
830 *schema
831 );
832 assert_eq!(
833 &vec![
834 make_row_with_2_labels("z002", "biz", 5.0, 1000),
835 make_row_with_2_labels("z002", "biz", 6.0, 2000),
836 make_row_with_2_labels("z002", "biz", 7.0, 3000),
837 ],
838 rows
839 );
840 }
841
842 #[test]
843 fn test_recordbatches_to_timeseries() {
844 let schema = Arc::new(Schema::new(vec![
845 ColumnSchema::new(
846 greptime_timestamp(),
847 ConcreteDataType::timestamp_millisecond_datatype(),
848 true,
849 ),
850 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
851 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
852 ]));
853
854 let recordbatches = RecordBatches::try_new(
855 schema.clone(),
856 vec![
857 RecordBatch::new(
858 schema.clone(),
859 vec![
860 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
861 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
862 Arc::new(StringVector::from(vec!["host1"])) as _,
863 ],
864 )
865 .unwrap(),
866 RecordBatch::new(
867 schema,
868 vec![
869 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
870 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
871 Arc::new(StringVector::from(vec!["host2"])) as _,
872 ],
873 )
874 .unwrap(),
875 ],
876 )
877 .unwrap();
878
879 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
880 assert_eq!(2, timeseries.len());
881
882 assert_eq!(
883 vec![
884 Label {
885 name: METRIC_NAME_LABEL.to_string(),
886 value: "metric1".to_string(),
887 },
888 Label {
889 name: "instance".to_string(),
890 value: "host1".to_string(),
891 },
892 ],
893 timeseries[0].labels
894 );
895
896 assert_eq!(
897 timeseries[0].samples,
898 vec![Sample {
899 value: 3.0,
900 timestamp: 1000,
901 }]
902 );
903
904 assert_eq!(
905 vec![
906 Label {
907 name: METRIC_NAME_LABEL.to_string(),
908 value: "metric1".to_string(),
909 },
910 Label {
911 name: "instance".to_string(),
912 value: "host2".to_string(),
913 },
914 ],
915 timeseries[1].labels
916 );
917 assert_eq!(
918 timeseries[1].samples,
919 vec![Sample {
920 value: 7.0,
921 timestamp: 2000,
922 }]
923 );
924 }
925}