1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use common_grpc::precision::Precision;
25use common_query::prelude::{greptime_timestamp, greptime_value};
26use common_recordbatch::{RecordBatch, RecordBatches};
27use common_telemetry::tracing;
28use common_time::timestamp::TimeUnit;
29use datafusion::prelude::{Expr, col, lit, regexp_match};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::prelude::{ConcreteDataType, Value};
33use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
34use query::dataframe::DataFrame;
35use snafu::{OptionExt, ResultExt, ensure};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53pub const FIELD_NAME_LABEL: &str = "__field__";
55
56pub struct Metrics {
58 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61pub fn table_name(q: &Query) -> Result<String> {
63 let label_matches = &q.matchers;
64
65 label_matches
66 .iter()
67 .find_map(|m| {
68 if m.name == METRIC_NAME_LABEL {
69 Some(m.value.clone())
70 } else {
71 None
72 }
73 })
74 .context(error::InvalidPromRemoteRequestSnafu {
75 msg: "missing '__name__' label in timeseries",
76 })
77}
78
79pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82 for query in &request.queries {
83 for matcher in &query.matchers {
84 if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85 return Some(matcher.value.clone());
86 }
87 }
88 }
89
90 for query in &request.queries {
92 for matcher in &query.matchers {
93 if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94 return Some(matcher.value.clone());
95 }
96 }
97 }
98
99 None
100}
101
102#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105 let DataFrame::DataFusion(dataframe) = dataframe;
106
107 let start_timestamp_ms = q.start_timestamp_ms;
108 let end_timestamp_ms = q.end_timestamp_ms;
109
110 let label_matches = &q.matchers;
111
112 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
113
114 conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
115 conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
116
117 for m in label_matches {
118 let name = &m.name;
119
120 if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
121 continue;
122 }
123
124 let value = &m.value;
125 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
126 error::InvalidPromRemoteRequestSnafu {
127 msg: format!("invalid LabelMatcher type, decode error: {e}",),
128 }
129 .build()
130 })?;
131
132 match m_type {
133 MatcherType::Eq => {
134 conditions.push(col(name).eq(lit(value)));
135 }
136 MatcherType::Neq => {
137 conditions.push(col(name).not_eq(lit(value)));
138 }
139 MatcherType::Re => {
141 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
142 }
143 MatcherType::Nre => {
145 conditions.push(regexp_match(col(name), lit(value), None).is_null());
146 }
147 }
148 }
149
150 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
152
153 let dataframe = dataframe
154 .filter(conditions)
155 .context(error::DataFrameSnafu)?;
156
157 Ok(dataframe.into_parts().1)
158}
159
160#[inline]
161fn new_label(name: String, value: String) -> Label {
162 Label { name, value }
163}
164
165fn lit_timestamp_millisecond(ts: i64) -> Expr {
166 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
167}
168
169#[derive(Debug)]
171struct TimeSeriesId {
172 labels: Vec<Label>,
173}
174
175impl PartialEq for TimeSeriesId {
177 fn eq(&self, other: &Self) -> bool {
178 if self.labels.len() != other.labels.len() {
179 return false;
180 }
181
182 self.labels
183 .iter()
184 .zip(other.labels.iter())
185 .all(|(l, r)| l.name == r.name && l.value == r.value)
186 }
187}
188impl Eq for TimeSeriesId {}
189
190impl Hash for TimeSeriesId {
191 fn hash<H: Hasher>(&self, state: &mut H) {
192 for label in &self.labels {
193 label.name.hash(state);
194 label.value.hash(state);
195 }
196 }
197}
198
199impl Ord for TimeSeriesId {
201 fn cmp(&self, other: &Self) -> Ordering {
202 let ordering = self.labels.len().cmp(&other.labels.len());
203 if ordering != Ordering::Equal {
204 return ordering;
205 }
206
207 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
208 let ordering = l.name.cmp(&r.name);
209
210 if ordering != Ordering::Equal {
211 return ordering;
212 }
213
214 let ordering = l.value.cmp(&r.value);
215
216 if ordering != Ordering::Equal {
217 return ordering;
218 }
219 }
220 Ordering::Equal
221 }
222}
223
224impl PartialOrd for TimeSeriesId {
225 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
226 Some(self.cmp(other))
227 }
228}
229
230fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
233 let row_count = recordbatch.num_rows();
234 let mut timeseries_ids = Vec::with_capacity(row_count);
235
236 for row in 0..row_count {
237 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
238 labels.push(new_label(
239 METRIC_NAME_LABEL.to_string(),
240 table_name.to_string(),
241 ));
242
243 for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
244 if column_schema.name == greptime_value() || column_schema.name == greptime_timestamp()
245 {
246 continue;
247 }
248
249 let column = &recordbatch.columns()[i];
250 if column.is_null(row) {
252 continue;
253 }
254
255 let value = column.get(row).to_string();
256 labels.push(new_label(column_schema.name.clone(), value));
257 }
258 timeseries_ids.push(TimeSeriesId { labels });
259 }
260 timeseries_ids
261}
262
263pub fn recordbatches_to_timeseries(
264 table_name: &str,
265 recordbatches: RecordBatches,
266) -> Result<Vec<TimeSeries>> {
267 Ok(recordbatches
268 .take()
269 .into_iter()
270 .map(|x| recordbatch_to_timeseries(table_name, x))
271 .collect::<Result<Vec<_>>>()?
272 .into_iter()
273 .flatten()
274 .collect())
275}
276
277fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
278 let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
279 error::InvalidPromRemoteReadQueryResultSnafu {
280 msg: "missing greptime_timestamp column in query result",
281 },
282 )?;
283 ensure!(
284 ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
285 error::InvalidPromRemoteReadQueryResultSnafu {
286 msg: format!(
287 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
288 ts_column.data_type()
289 )
290 }
291 );
292
293 let field_column = recordbatch.column_by_name(greptime_value()).context(
294 error::InvalidPromRemoteReadQueryResultSnafu {
295 msg: "missing greptime_value column in query result",
296 },
297 )?;
298 ensure!(
299 field_column.data_type() == ConcreteDataType::float64_datatype(),
300 error::InvalidPromRemoteReadQueryResultSnafu {
301 msg: format!(
302 "Expect value column of datatype Float64, actual {:?}",
303 field_column.data_type()
304 )
305 }
306 );
307
308 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
310 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
312
313 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
314 let timeseries = timeseries_map
315 .entry(timeseries_id)
316 .or_insert_with(|| TimeSeries {
317 labels: timeseries_id.labels.clone(),
318 ..Default::default()
319 });
320
321 if ts_column.is_null(row) || field_column.is_null(row) {
322 continue;
323 }
324
325 let value: f64 = match field_column.get(row) {
326 Value::Float64(value) => value.into(),
327 _ => unreachable!("checked by the \"ensure\" above"),
328 };
329 let timestamp = match ts_column.get(row) {
330 Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
331 _ => unreachable!("checked by the \"ensure\" above"),
332 };
333 let sample = Sample { value, timestamp };
334
335 timeseries.samples.push(sample);
336 }
337
338 Ok(timeseries_map.into_values().collect())
339}
340
341pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
342 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
343
344 let mut multi_table_data = MultiTableData::new();
345
346 for series in &request.timeseries {
347 let table_name = &series
348 .labels
349 .iter()
350 .find(|label| {
351 label.name == METRIC_NAME_LABEL
353 })
354 .context(error::InvalidPromRemoteRequestSnafu {
355 msg: "missing '__name__' label in time-series",
356 })?
357 .value;
358
359 let num_columns = series.labels.len() + 1;
362
363 let table_data = multi_table_data.get_or_default_table_data(
364 table_name,
365 num_columns,
366 series.samples.len(),
367 );
368
369 let kvs = series.labels.iter().filter_map(|label| {
371 if label.name == METRIC_NAME_LABEL {
372 None
373 } else {
374 Some((label.name.clone(), label.value.clone()))
375 }
376 });
377
378 if series.samples.len() == 1 {
379 let mut one_row = table_data.alloc_one_row();
380
381 row_writer::write_tags(table_data, kvs, &mut one_row)?;
382 row_writer::write_f64(
384 table_data,
385 greptime_value(),
386 series.samples[0].value,
387 &mut one_row,
388 )?;
389 row_writer::write_ts_to_millis(
391 table_data,
392 greptime_timestamp(),
393 Some(series.samples[0].timestamp),
394 Precision::Millisecond,
395 &mut one_row,
396 )?;
397
398 table_data.add_row(one_row);
399 } else {
400 for Sample { value, timestamp } in &series.samples {
401 let mut one_row = table_data.alloc_one_row();
402
403 let kvs = kvs.clone();
405 row_writer::write_tags(table_data, kvs, &mut one_row)?;
406 row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
408 row_writer::write_ts_to_millis(
410 table_data,
411 greptime_timestamp(),
412 Some(*timestamp),
413 Precision::Millisecond,
414 &mut one_row,
415 )?;
416
417 table_data.add_row(one_row);
418 }
419 }
420 }
421
422 Ok(multi_table_data.into_row_insert_requests())
423}
424
425#[inline]
426pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
427 let mut decoder = Decoder::new();
428 decoder
429 .decompress_vec(buf)
430 .context(error::DecompressSnappyPromRemoteRequestSnafu)
431}
432
433#[inline]
434pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
435 let mut encoder = Encoder::new();
436 encoder
437 .compress_vec(buf)
438 .context(error::CompressPromRemoteRequestSnafu)
439}
440
441#[inline]
442pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
443 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
444}
445
446pub fn mock_timeseries() -> Vec<TimeSeries> {
449 vec![
450 TimeSeries {
451 labels: vec![
452 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
453 new_label("job".to_string(), "spark".to_string()),
454 ],
455 samples: vec![
456 Sample {
457 value: 1.0f64,
458 timestamp: 1000,
459 },
460 Sample {
461 value: 2.0f64,
462 timestamp: 2000,
463 },
464 ],
465 ..Default::default()
466 },
467 TimeSeries {
468 labels: vec![
469 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
470 new_label("instance".to_string(), "test_host1".to_string()),
471 new_label("idc".to_string(), "z001".to_string()),
472 ],
473 samples: vec![
474 Sample {
475 value: 3.0f64,
476 timestamp: 1000,
477 },
478 Sample {
479 value: 4.0f64,
480 timestamp: 2000,
481 },
482 ],
483 ..Default::default()
484 },
485 TimeSeries {
486 labels: vec![
487 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
488 new_label("idc".to_string(), "z002".to_string()),
489 new_label("app".to_string(), "biz".to_string()),
490 ],
491 samples: vec![
492 Sample {
493 value: 5.0f64,
494 timestamp: 1000,
495 },
496 Sample {
497 value: 6.0f64,
498 timestamp: 2000,
499 },
500 Sample {
501 value: 7.0f64,
502 timestamp: 3000,
503 },
504 ],
505 ..Default::default()
506 },
507 ]
508}
509
510pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
512 let ts_demo_metrics = TimeSeries {
513 labels: vec![
514 new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
515 new_label("idc".to_string(), "idc3".to_string()),
516 new_label("new_label1".to_string(), "foo".to_string()),
517 ],
518 samples: vec![Sample {
519 value: 42.0,
520 timestamp: 3000,
521 }],
522 ..Default::default()
523 };
524 let ts_multi_labels = TimeSeries {
525 labels: vec![
526 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
527 new_label("idc".to_string(), "idc4".to_string()),
528 new_label("env".to_string(), "prod".to_string()),
529 new_label("host".to_string(), "host9".to_string()),
530 new_label("new_label2".to_string(), "bar".to_string()),
531 ],
532 samples: vec![Sample {
533 value: 99.0,
534 timestamp: 4000,
535 }],
536 ..Default::default()
537 };
538
539 vec![ts_demo_metrics, ts_multi_labels]
540}
541
542pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
544 let idc3_schema = TimeSeries {
545 labels: vec![
546 new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
547 new_label("__database__".to_string(), "idc3".to_string()),
548 new_label("__physical_table__".to_string(), "f1".to_string()),
549 ],
550 samples: vec![Sample {
551 value: 42.0,
552 timestamp: 3000,
553 }],
554 ..Default::default()
555 };
556 let idc4_schema = TimeSeries {
557 labels: vec![
558 new_label(
559 METRIC_NAME_LABEL.to_string(),
560 "idc4_local_table".to_string(),
561 ),
562 new_label("__database__".to_string(), "idc4".to_string()),
563 new_label("__physical_table__".to_string(), "f2".to_string()),
564 ],
565 samples: vec![Sample {
566 value: 99.0,
567 timestamp: 4000,
568 }],
569 ..Default::default()
570 };
571
572 vec![idc3_schema, idc4_schema]
573}
574
575#[cfg(test)]
576mod tests {
577 use std::sync::Arc;
578
579 use api::prom_store::remote::LabelMatcher;
580 use api::v1::{ColumnDataType, Row, SemanticType};
581 use datafusion::prelude::SessionContext;
582 use datatypes::schema::{ColumnSchema, Schema};
583 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
584 use table::table::adapter::DfTableProviderAdapter;
585 use table::test_util::MemTable;
586
587 use super::*;
588
589 const EQ_TYPE: i32 = MatcherType::Eq as i32;
590 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
591 const RE_TYPE: i32 = MatcherType::Re as i32;
592
593 #[test]
594 fn test_table_name() {
595 let q = Query {
596 start_timestamp_ms: 1000,
597 end_timestamp_ms: 2000,
598 matchers: vec![],
599 ..Default::default()
600 };
601 let err = table_name(&q).unwrap_err();
602 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
603
604 let q = Query {
605 start_timestamp_ms: 1000,
606 end_timestamp_ms: 2000,
607 matchers: vec![LabelMatcher {
608 name: METRIC_NAME_LABEL.to_string(),
609 value: "test".to_string(),
610 r#type: EQ_TYPE,
611 }],
612 ..Default::default()
613 };
614 assert_eq!("test", table_name(&q).unwrap());
615 }
616
617 #[test]
618 fn test_query_to_plan() {
619 let q = Query {
620 start_timestamp_ms: 1000,
621 end_timestamp_ms: 2000,
622 matchers: vec![LabelMatcher {
623 name: METRIC_NAME_LABEL.to_string(),
624 value: "test".to_string(),
625 r#type: EQ_TYPE,
626 }],
627 ..Default::default()
628 };
629
630 let schema = Arc::new(Schema::new(vec![
631 ColumnSchema::new(
632 greptime_timestamp(),
633 ConcreteDataType::timestamp_millisecond_datatype(),
634 true,
635 ),
636 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
637 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
638 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
639 ]));
640 let recordbatch = RecordBatch::new(
641 schema,
642 vec![
643 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
644 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
645 Arc::new(StringVector::from(vec!["host1"])) as _,
646 Arc::new(StringVector::from(vec!["job"])) as _,
647 ],
648 )
649 .unwrap();
650
651 let ctx = SessionContext::new();
652 let table = MemTable::table("test", recordbatch);
653 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
654
655 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
656 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
657 let display_string = format!("{}", plan.display_indent());
658
659 let ts_col = greptime_timestamp();
660 let expected = format!(
661 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n TableScan: ?table?",
662 ts_col, ts_col
663 );
664 assert_eq!(expected, display_string);
665
666 let q = Query {
667 start_timestamp_ms: 1000,
668 end_timestamp_ms: 2000,
669 matchers: vec![
670 LabelMatcher {
671 name: METRIC_NAME_LABEL.to_string(),
672 value: "test".to_string(),
673 r#type: EQ_TYPE,
674 },
675 LabelMatcher {
676 name: "job".to_string(),
677 value: "*prom*".to_string(),
678 r#type: RE_TYPE,
679 },
680 LabelMatcher {
681 name: "instance".to_string(),
682 value: "localhost".to_string(),
683 r#type: NEQ_TYPE,
684 },
685 ],
686 ..Default::default()
687 };
688
689 let dataframe = ctx.read_table(table_provider).unwrap();
690 let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
691 let display_string = format!("{}", plan.display_indent());
692
693 let ts_col = greptime_timestamp();
694 let expected = format!(
695 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?",
696 ts_col, ts_col
697 );
698 assert_eq!(expected, display_string);
699 }
700
701 fn column_schemas_with(
702 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
703 ) -> Vec<api::v1::ColumnSchema> {
704 kts_iter.push((
705 greptime_value(),
706 ColumnDataType::Float64,
707 SemanticType::Field,
708 ));
709 kts_iter.push((
710 greptime_timestamp(),
711 ColumnDataType::TimestampMillisecond,
712 SemanticType::Timestamp,
713 ));
714
715 kts_iter
716 .into_iter()
717 .map(|(k, t, s)| api::v1::ColumnSchema {
718 column_name: k.to_string(),
719 datatype: t as i32,
720 semantic_type: s as i32,
721 ..Default::default()
722 })
723 .collect()
724 }
725
726 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
727 Row {
728 values: vec![
729 api::v1::Value {
730 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
731 },
732 api::v1::Value {
733 value_data: Some(api::v1::value::ValueData::F64Value(value)),
734 },
735 api::v1::Value {
736 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
737 timestamp,
738 )),
739 },
740 ],
741 }
742 }
743
744 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
745 Row {
746 values: vec![
747 api::v1::Value {
748 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
749 },
750 api::v1::Value {
751 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
752 },
753 api::v1::Value {
754 value_data: Some(api::v1::value::ValueData::F64Value(value)),
755 },
756 api::v1::Value {
757 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
758 timestamp,
759 )),
760 },
761 ],
762 }
763 }
764
765 #[test]
766 fn test_write_request_to_row_insert_exprs() {
767 let write_request = WriteRequest {
768 timeseries: mock_timeseries(),
769 ..Default::default()
770 };
771
772 let mut exprs = to_grpc_row_insert_requests(&write_request)
773 .unwrap()
774 .0
775 .inserts;
776 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
777 assert_eq!(3, exprs.len());
778 assert_eq!("metric1", exprs[0].table_name);
779 assert_eq!("metric2", exprs[1].table_name);
780 assert_eq!("metric3", exprs[2].table_name);
781
782 let rows = exprs[0].rows.as_ref().unwrap();
783 let schema = &rows.schema;
784 let rows = &rows.rows;
785 assert_eq!(2, rows.len());
786 assert_eq!(3, schema.len());
787 assert_eq!(
788 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
789 *schema
790 );
791 assert_eq!(
792 &vec![
793 make_row_with_label("spark", 1.0, 1000),
794 make_row_with_label("spark", 2.0, 2000),
795 ],
796 rows
797 );
798
799 let rows = exprs[1].rows.as_ref().unwrap();
800 let schema = &rows.schema;
801 let rows = &rows.rows;
802 assert_eq!(2, rows.len());
803 assert_eq!(4, schema.len());
804 assert_eq!(
805 column_schemas_with(vec![
806 ("instance", ColumnDataType::String, SemanticType::Tag),
807 ("idc", ColumnDataType::String, SemanticType::Tag)
808 ]),
809 *schema
810 );
811 assert_eq!(
812 &vec![
813 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
814 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
815 ],
816 rows
817 );
818
819 let rows = exprs[2].rows.as_ref().unwrap();
820 let schema = &rows.schema;
821 let rows = &rows.rows;
822 assert_eq!(3, rows.len());
823 assert_eq!(4, schema.len());
824 assert_eq!(
825 column_schemas_with(vec![
826 ("idc", ColumnDataType::String, SemanticType::Tag),
827 ("app", ColumnDataType::String, SemanticType::Tag)
828 ]),
829 *schema
830 );
831 assert_eq!(
832 &vec![
833 make_row_with_2_labels("z002", "biz", 5.0, 1000),
834 make_row_with_2_labels("z002", "biz", 6.0, 2000),
835 make_row_with_2_labels("z002", "biz", 7.0, 3000),
836 ],
837 rows
838 );
839 }
840
841 #[test]
842 fn test_recordbatches_to_timeseries() {
843 let schema = Arc::new(Schema::new(vec![
844 ColumnSchema::new(
845 greptime_timestamp(),
846 ConcreteDataType::timestamp_millisecond_datatype(),
847 true,
848 ),
849 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
850 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
851 ]));
852
853 let recordbatches = RecordBatches::try_new(
854 schema.clone(),
855 vec![
856 RecordBatch::new(
857 schema.clone(),
858 vec![
859 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
860 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
861 Arc::new(StringVector::from(vec!["host1"])) as _,
862 ],
863 )
864 .unwrap(),
865 RecordBatch::new(
866 schema,
867 vec![
868 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
869 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
870 Arc::new(StringVector::from(vec!["host2"])) as _,
871 ],
872 )
873 .unwrap(),
874 ],
875 )
876 .unwrap();
877
878 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
879 assert_eq!(2, timeseries.len());
880
881 assert_eq!(
882 vec![
883 Label {
884 name: METRIC_NAME_LABEL.to_string(),
885 value: "metric1".to_string(),
886 },
887 Label {
888 name: "instance".to_string(),
889 value: "host1".to_string(),
890 },
891 ],
892 timeseries[0].labels
893 );
894
895 assert_eq!(
896 timeseries[0].samples,
897 vec![Sample {
898 value: 3.0,
899 timestamp: 1000,
900 }]
901 );
902
903 assert_eq!(
904 vec![
905 Label {
906 name: METRIC_NAME_LABEL.to_string(),
907 value: "metric1".to_string(),
908 },
909 Label {
910 name: "instance".to_string(),
911 value: "host2".to_string(),
912 },
913 ],
914 timeseries[1].labels
915 );
916 assert_eq!(
917 timeseries[1].samples,
918 vec![Sample {
919 value: 7.0,
920 timestamp: 2000,
921 }]
922 );
923 }
924}