1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use arrow::array::{Array, AsArray};
25use arrow::datatypes::{Float64Type, TimestampMillisecondType};
26use common_grpc::precision::Precision;
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_recordbatch::{RecordBatch, RecordBatches};
29use common_telemetry::{tracing, warn};
30use datafusion::dataframe::DataFrame;
31use datafusion::prelude::{Expr, col, lit, regexp_match};
32use datafusion_common::ScalarValue;
33use datafusion_expr::LogicalPlan;
34use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
35use snafu::{OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "__database__";
45pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
46
47pub const SCHEMA_LABEL: &str = "__schema__";
48pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
49
50pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
51pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
52
53pub const FIELD_NAME_LABEL: &str = "__field__";
55
56pub struct Metrics {
58 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
59}
60
61pub fn table_name(q: &Query) -> Result<String> {
63 let label_matches = &q.matchers;
64
65 label_matches
66 .iter()
67 .find_map(|m| {
68 if m.name == METRIC_NAME_LABEL {
69 Some(m.value.clone())
70 } else {
71 None
72 }
73 })
74 .context(error::InvalidPromRemoteRequestSnafu {
75 msg: "missing '__name__' label in timeseries",
76 })
77}
78
79pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
82 for query in &request.queries {
83 for matcher in &query.matchers {
84 if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
85 return Some(matcher.value.clone());
86 }
87 }
88 }
89
90 for query in &request.queries {
92 for matcher in &query.matchers {
93 if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
94 return Some(matcher.value.clone());
95 }
96 }
97 }
98
99 None
100}
101
102#[tracing::instrument(skip_all)]
104pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
105 let start_timestamp_ms = q.start_timestamp_ms;
106 let end_timestamp_ms = q.end_timestamp_ms;
107
108 let label_matches = &q.matchers;
109
110 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
111
112 conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
113 conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
114
115 for m in label_matches {
116 let name = &m.name;
117
118 if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
119 continue;
120 }
121
122 let value = &m.value;
123 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
124 error::InvalidPromRemoteRequestSnafu {
125 msg: format!("invalid LabelMatcher type, decode error: {e}",),
126 }
127 .build()
128 })?;
129
130 match m_type {
131 MatcherType::Eq => {
132 conditions.push(col(name).eq(lit(value)));
133 }
134 MatcherType::Neq => {
135 conditions.push(col(name).not_eq(lit(value)));
136 }
137 MatcherType::Re => {
139 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
140 }
141 MatcherType::Nre => {
143 conditions.push(regexp_match(col(name), lit(value), None).is_null());
144 }
145 }
146 }
147
148 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
150
151 let dataframe = dataframe
152 .filter(conditions)
153 .context(error::DataFrameSnafu)?;
154
155 Ok(dataframe.into_parts().1)
156}
157
158#[inline]
159fn new_label(name: String, value: String) -> Label {
160 Label { name, value }
161}
162
163fn lit_timestamp_millisecond(ts: i64) -> Expr {
164 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
165}
166
167#[derive(Debug)]
169struct TimeSeriesId {
170 labels: Vec<Label>,
171}
172
173impl PartialEq for TimeSeriesId {
175 fn eq(&self, other: &Self) -> bool {
176 if self.labels.len() != other.labels.len() {
177 return false;
178 }
179
180 self.labels
181 .iter()
182 .zip(other.labels.iter())
183 .all(|(l, r)| l.name == r.name && l.value == r.value)
184 }
185}
186impl Eq for TimeSeriesId {}
187
188impl Hash for TimeSeriesId {
189 fn hash<H: Hasher>(&self, state: &mut H) {
190 for label in &self.labels {
191 label.name.hash(state);
192 label.value.hash(state);
193 }
194 }
195}
196
197impl Ord for TimeSeriesId {
199 fn cmp(&self, other: &Self) -> Ordering {
200 let ordering = self.labels.len().cmp(&other.labels.len());
201 if ordering != Ordering::Equal {
202 return ordering;
203 }
204
205 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
206 let ordering = l.name.cmp(&r.name);
207
208 if ordering != Ordering::Equal {
209 return ordering;
210 }
211
212 let ordering = l.value.cmp(&r.value);
213
214 if ordering != Ordering::Equal {
215 return ordering;
216 }
217 }
218 Ordering::Equal
219 }
220}
221
222impl PartialOrd for TimeSeriesId {
223 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
224 Some(self.cmp(other))
225 }
226}
227
228fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
231 let row_count = recordbatch.num_rows();
232 let mut timeseries_ids = Vec::with_capacity(row_count);
233
234 let column_names = recordbatch
235 .schema
236 .column_schemas()
237 .iter()
238 .map(|column_schema| &column_schema.name);
239 let columns = column_names
240 .enumerate()
241 .filter(|(_, column_name)| {
242 *column_name != greptime_timestamp() && *column_name != greptime_value()
243 })
244 .map(|(i, column_name)| {
245 (
246 column_name,
247 recordbatch.iter_column_as_string(i).collect::<Vec<_>>(),
248 )
249 })
250 .collect::<Vec<_>>();
251
252 for row in 0..row_count {
253 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
254 labels.push(new_label(
255 METRIC_NAME_LABEL.to_string(),
256 table_name.to_string(),
257 ));
258
259 for (column_name, column_values) in columns.iter() {
260 if let Some(value) = &column_values[row] {
261 labels.push(new_label((*column_name).clone(), value.clone()));
262 }
263 }
264 timeseries_ids.push(TimeSeriesId { labels });
265 }
266 timeseries_ids
267}
268
269pub fn recordbatches_to_timeseries(
270 table_name: &str,
271 recordbatches: RecordBatches,
272) -> Result<Vec<TimeSeries>> {
273 Ok(recordbatches
274 .take()
275 .into_iter()
276 .map(|x| recordbatch_to_timeseries(table_name, x))
277 .collect::<Result<Vec<_>>>()?
278 .into_iter()
279 .flatten()
280 .collect())
281}
282
283fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
284 let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
285 error::InvalidPromRemoteReadQueryResultSnafu {
286 msg: "missing greptime_timestamp column in query result",
287 },
288 )?;
289 let ts_column = ts_column
290 .as_primitive_opt::<TimestampMillisecondType>()
291 .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
292 msg: format!(
293 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
294 ts_column.data_type()
295 ),
296 })?;
297
298 let field_column = recordbatch.column_by_name(greptime_value()).context(
299 error::InvalidPromRemoteReadQueryResultSnafu {
300 msg: "missing greptime_value column in query result",
301 },
302 )?;
303 let field_column = field_column
304 .as_primitive_opt::<Float64Type>()
305 .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
306 msg: format!(
307 "Expect value column of datatype Float64, actual {:?}",
308 field_column.data_type()
309 ),
310 })?;
311
312 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
314 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
316
317 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
318 let timeseries = timeseries_map
319 .entry(timeseries_id)
320 .or_insert_with(|| TimeSeries {
321 labels: timeseries_id.labels.clone(),
322 ..Default::default()
323 });
324
325 if ts_column.is_null(row) || field_column.is_null(row) {
326 continue;
327 }
328
329 let value = field_column.value(row);
330 let timestamp = ts_column.value(row);
331 let sample = Sample { value, timestamp };
332
333 timeseries.samples.push(sample);
334 }
335
336 Ok(timeseries_map.into_values().collect())
337}
338
339pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
340 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
341
342 let mut multi_table_data = MultiTableData::new();
343
344 for series in &request.timeseries {
345 let table_name = &series
346 .labels
347 .iter()
348 .find(|label| {
349 label.name == METRIC_NAME_LABEL
351 })
352 .context(error::InvalidPromRemoteRequestSnafu {
353 msg: "missing '__name__' label in time-series",
354 })?
355 .value;
356
357 let num_columns = series.labels.len() + 1;
360
361 let table_data = multi_table_data.get_or_default_table_data(
362 table_name,
363 num_columns,
364 series.samples.len(),
365 );
366
367 let kvs = series.labels.iter().filter_map(|label| {
369 if label.name == METRIC_NAME_LABEL {
370 None
371 } else {
372 Some((label.name.clone(), label.value.clone()))
373 }
374 });
375
376 if series.samples.len() == 1 {
377 let mut one_row = table_data.alloc_one_row();
378
379 row_writer::write_tags(table_data, kvs, &mut one_row)?;
380 row_writer::write_f64(
382 table_data,
383 greptime_value(),
384 series.samples[0].value,
385 &mut one_row,
386 )?;
387 row_writer::write_ts_to_millis(
389 table_data,
390 greptime_timestamp(),
391 Some(series.samples[0].timestamp),
392 Precision::Millisecond,
393 &mut one_row,
394 )?;
395
396 table_data.add_row(one_row);
397 } else {
398 for Sample { value, timestamp } in &series.samples {
399 let mut one_row = table_data.alloc_one_row();
400
401 let kvs = kvs.clone();
403 row_writer::write_tags(table_data, kvs, &mut one_row)?;
404 row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
406 row_writer::write_ts_to_millis(
408 table_data,
409 greptime_timestamp(),
410 Some(*timestamp),
411 Precision::Millisecond,
412 &mut one_row,
413 )?;
414
415 table_data.add_row(one_row);
416 }
417 }
418
419 if !series.histograms.is_empty() {
420 warn!("Native histograms are not supported yet, data ignored");
421 }
422 }
423
424 Ok(multi_table_data.into_row_insert_requests())
425}
426
427#[inline]
428pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
429 let mut decoder = Decoder::new();
430 decoder
431 .decompress_vec(buf)
432 .context(error::DecompressSnappyPromRemoteRequestSnafu)
433}
434
435#[inline]
436pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
437 let mut encoder = Encoder::new();
438 encoder
439 .compress_vec(buf)
440 .context(error::CompressPromRemoteRequestSnafu)
441}
442
443#[inline]
444pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
445 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
446}
447
448pub fn mock_timeseries() -> Vec<TimeSeries> {
451 vec![
452 TimeSeries {
453 labels: vec![
454 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
455 new_label("job".to_string(), "spark".to_string()),
456 ],
457 samples: vec![
458 Sample {
459 value: 1.0f64,
460 timestamp: 1000,
461 },
462 Sample {
463 value: 2.0f64,
464 timestamp: 2000,
465 },
466 ],
467 ..Default::default()
468 },
469 TimeSeries {
470 labels: vec![
471 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
472 new_label("instance".to_string(), "test_host1".to_string()),
473 new_label("idc".to_string(), "z001".to_string()),
474 ],
475 samples: vec![
476 Sample {
477 value: 3.0f64,
478 timestamp: 1000,
479 },
480 Sample {
481 value: 4.0f64,
482 timestamp: 2000,
483 },
484 ],
485 ..Default::default()
486 },
487 TimeSeries {
488 labels: vec![
489 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
490 new_label("idc".to_string(), "z002".to_string()),
491 new_label("app".to_string(), "biz".to_string()),
492 ],
493 samples: vec![
494 Sample {
495 value: 5.0f64,
496 timestamp: 1000,
497 },
498 Sample {
499 value: 6.0f64,
500 timestamp: 2000,
501 },
502 Sample {
503 value: 7.0f64,
504 timestamp: 3000,
505 },
506 ],
507 ..Default::default()
508 },
509 ]
510}
511
512pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
514 let ts_demo_metrics = TimeSeries {
515 labels: vec![
516 new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
517 new_label("idc".to_string(), "idc3".to_string()),
518 new_label("new_label1".to_string(), "foo".to_string()),
519 ],
520 samples: vec![Sample {
521 value: 42.0,
522 timestamp: 3000,
523 }],
524 ..Default::default()
525 };
526 let ts_multi_labels = TimeSeries {
527 labels: vec![
528 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
529 new_label("idc".to_string(), "idc4".to_string()),
530 new_label("env".to_string(), "prod".to_string()),
531 new_label("host".to_string(), "host9".to_string()),
532 new_label("new_label2".to_string(), "bar".to_string()),
533 ],
534 samples: vec![Sample {
535 value: 99.0,
536 timestamp: 4000,
537 }],
538 ..Default::default()
539 };
540
541 vec![ts_demo_metrics, ts_multi_labels]
542}
543
544pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
546 let idc3_schema = TimeSeries {
547 labels: vec![
548 new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
549 new_label("__database__".to_string(), "idc3".to_string()),
550 new_label("__physical_table__".to_string(), "f1".to_string()),
551 ],
552 samples: vec![Sample {
553 value: 42.0,
554 timestamp: 3000,
555 }],
556 ..Default::default()
557 };
558 let idc4_schema = TimeSeries {
559 labels: vec![
560 new_label(
561 METRIC_NAME_LABEL.to_string(),
562 "idc4_local_table".to_string(),
563 ),
564 new_label("__database__".to_string(), "idc4".to_string()),
565 new_label("__physical_table__".to_string(), "f2".to_string()),
566 ],
567 samples: vec![Sample {
568 value: 99.0,
569 timestamp: 4000,
570 }],
571 ..Default::default()
572 };
573
574 vec![idc3_schema, idc4_schema]
575}
576
577#[cfg(test)]
578mod tests {
579 use std::sync::Arc;
580
581 use api::prom_store::remote::LabelMatcher;
582 use api::v1::{ColumnDataType, Row, SemanticType};
583 use datafusion::prelude::SessionContext;
584 use datatypes::data_type::ConcreteDataType;
585 use datatypes::schema::{ColumnSchema, Schema};
586 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
587 use table::table::adapter::DfTableProviderAdapter;
588 use table::test_util::MemTable;
589
590 use super::*;
591
592 const EQ_TYPE: i32 = MatcherType::Eq as i32;
593 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
594 const RE_TYPE: i32 = MatcherType::Re as i32;
595
596 #[test]
597 fn test_table_name() {
598 let q = Query {
599 start_timestamp_ms: 1000,
600 end_timestamp_ms: 2000,
601 matchers: vec![],
602 ..Default::default()
603 };
604 let err = table_name(&q).unwrap_err();
605 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
606
607 let q = Query {
608 start_timestamp_ms: 1000,
609 end_timestamp_ms: 2000,
610 matchers: vec![LabelMatcher {
611 name: METRIC_NAME_LABEL.to_string(),
612 value: "test".to_string(),
613 r#type: EQ_TYPE,
614 }],
615 ..Default::default()
616 };
617 assert_eq!("test", table_name(&q).unwrap());
618 }
619
620 #[test]
621 fn test_query_to_plan() {
622 let q = Query {
623 start_timestamp_ms: 1000,
624 end_timestamp_ms: 2000,
625 matchers: vec![LabelMatcher {
626 name: METRIC_NAME_LABEL.to_string(),
627 value: "test".to_string(),
628 r#type: EQ_TYPE,
629 }],
630 ..Default::default()
631 };
632
633 let schema = Arc::new(Schema::new(vec![
634 ColumnSchema::new(
635 greptime_timestamp(),
636 ConcreteDataType::timestamp_millisecond_datatype(),
637 true,
638 ),
639 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
640 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
641 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
642 ]));
643 let recordbatch = RecordBatch::new(
644 schema,
645 vec![
646 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
647 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
648 Arc::new(StringVector::from(vec!["host1"])) as _,
649 Arc::new(StringVector::from(vec!["job"])) as _,
650 ],
651 )
652 .unwrap();
653
654 let ctx = SessionContext::new();
655 let table = MemTable::table("test", recordbatch);
656 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
657
658 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
659 let plan = query_to_plan(dataframe, &q).unwrap();
660 let display_string = format!("{}", plan.display_indent());
661
662 let ts_col = greptime_timestamp();
663 let expected = format!(
664 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n TableScan: ?table?",
665 ts_col, ts_col
666 );
667 assert_eq!(expected, display_string);
668
669 let q = Query {
670 start_timestamp_ms: 1000,
671 end_timestamp_ms: 2000,
672 matchers: vec![
673 LabelMatcher {
674 name: METRIC_NAME_LABEL.to_string(),
675 value: "test".to_string(),
676 r#type: EQ_TYPE,
677 },
678 LabelMatcher {
679 name: "job".to_string(),
680 value: "*prom*".to_string(),
681 r#type: RE_TYPE,
682 },
683 LabelMatcher {
684 name: "instance".to_string(),
685 value: "localhost".to_string(),
686 r#type: NEQ_TYPE,
687 },
688 ],
689 ..Default::default()
690 };
691
692 let dataframe = ctx.read_table(table_provider).unwrap();
693 let plan = query_to_plan(dataframe, &q).unwrap();
694 let display_string = format!("{}", plan.display_indent());
695
696 let ts_col = greptime_timestamp();
697 let expected = format!(
698 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?",
699 ts_col, ts_col
700 );
701 assert_eq!(expected, display_string);
702 }
703
704 fn column_schemas_with(
705 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
706 ) -> Vec<api::v1::ColumnSchema> {
707 kts_iter.push((
708 greptime_value(),
709 ColumnDataType::Float64,
710 SemanticType::Field,
711 ));
712 kts_iter.push((
713 greptime_timestamp(),
714 ColumnDataType::TimestampMillisecond,
715 SemanticType::Timestamp,
716 ));
717
718 kts_iter
719 .into_iter()
720 .map(|(k, t, s)| api::v1::ColumnSchema {
721 column_name: k.to_string(),
722 datatype: t as i32,
723 semantic_type: s as i32,
724 ..Default::default()
725 })
726 .collect()
727 }
728
729 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
730 Row {
731 values: vec![
732 api::v1::Value {
733 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
734 },
735 api::v1::Value {
736 value_data: Some(api::v1::value::ValueData::F64Value(value)),
737 },
738 api::v1::Value {
739 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
740 timestamp,
741 )),
742 },
743 ],
744 }
745 }
746
747 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
748 Row {
749 values: vec![
750 api::v1::Value {
751 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
752 },
753 api::v1::Value {
754 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
755 },
756 api::v1::Value {
757 value_data: Some(api::v1::value::ValueData::F64Value(value)),
758 },
759 api::v1::Value {
760 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
761 timestamp,
762 )),
763 },
764 ],
765 }
766 }
767
768 #[test]
769 fn test_write_request_to_row_insert_exprs() {
770 let write_request = WriteRequest {
771 timeseries: mock_timeseries(),
772 ..Default::default()
773 };
774
775 let mut exprs = to_grpc_row_insert_requests(&write_request)
776 .unwrap()
777 .0
778 .inserts;
779 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
780 assert_eq!(3, exprs.len());
781 assert_eq!("metric1", exprs[0].table_name);
782 assert_eq!("metric2", exprs[1].table_name);
783 assert_eq!("metric3", exprs[2].table_name);
784
785 let rows = exprs[0].rows.as_ref().unwrap();
786 let schema = &rows.schema;
787 let rows = &rows.rows;
788 assert_eq!(2, rows.len());
789 assert_eq!(3, schema.len());
790 assert_eq!(
791 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
792 *schema
793 );
794 assert_eq!(
795 &vec![
796 make_row_with_label("spark", 1.0, 1000),
797 make_row_with_label("spark", 2.0, 2000),
798 ],
799 rows
800 );
801
802 let rows = exprs[1].rows.as_ref().unwrap();
803 let schema = &rows.schema;
804 let rows = &rows.rows;
805 assert_eq!(2, rows.len());
806 assert_eq!(4, schema.len());
807 assert_eq!(
808 column_schemas_with(vec![
809 ("instance", ColumnDataType::String, SemanticType::Tag),
810 ("idc", ColumnDataType::String, SemanticType::Tag)
811 ]),
812 *schema
813 );
814 assert_eq!(
815 &vec![
816 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
817 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
818 ],
819 rows
820 );
821
822 let rows = exprs[2].rows.as_ref().unwrap();
823 let schema = &rows.schema;
824 let rows = &rows.rows;
825 assert_eq!(3, rows.len());
826 assert_eq!(4, schema.len());
827 assert_eq!(
828 column_schemas_with(vec![
829 ("idc", ColumnDataType::String, SemanticType::Tag),
830 ("app", ColumnDataType::String, SemanticType::Tag)
831 ]),
832 *schema
833 );
834 assert_eq!(
835 &vec![
836 make_row_with_2_labels("z002", "biz", 5.0, 1000),
837 make_row_with_2_labels("z002", "biz", 6.0, 2000),
838 make_row_with_2_labels("z002", "biz", 7.0, 3000),
839 ],
840 rows
841 );
842 }
843
844 #[test]
845 fn test_recordbatches_to_timeseries() {
846 let schema = Arc::new(Schema::new(vec![
847 ColumnSchema::new(
848 greptime_timestamp(),
849 ConcreteDataType::timestamp_millisecond_datatype(),
850 true,
851 ),
852 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
853 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
854 ]));
855
856 let recordbatches = RecordBatches::try_new(
857 schema.clone(),
858 vec![
859 RecordBatch::new(
860 schema.clone(),
861 vec![
862 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
863 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
864 Arc::new(StringVector::from(vec!["host1"])) as _,
865 ],
866 )
867 .unwrap(),
868 RecordBatch::new(
869 schema,
870 vec![
871 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
872 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
873 Arc::new(StringVector::from(vec!["host2"])) as _,
874 ],
875 )
876 .unwrap(),
877 ],
878 )
879 .unwrap();
880
881 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
882 assert_eq!(2, timeseries.len());
883
884 assert_eq!(
885 vec![
886 Label {
887 name: METRIC_NAME_LABEL.to_string(),
888 value: "metric1".to_string(),
889 },
890 Label {
891 name: "instance".to_string(),
892 value: "host1".to_string(),
893 },
894 ],
895 timeseries[0].labels
896 );
897
898 assert_eq!(
899 timeseries[0].samples,
900 vec![Sample {
901 value: 3.0,
902 timestamp: 1000,
903 }]
904 );
905
906 assert_eq!(
907 vec![
908 Label {
909 name: METRIC_NAME_LABEL.to_string(),
910 value: "metric1".to_string(),
911 },
912 Label {
913 name: "instance".to_string(),
914 value: "host2".to_string(),
915 },
916 ],
917 timeseries[1].labels
918 );
919 assert_eq!(
920 timeseries[1].samples,
921 vec![Sample {
922 value: 7.0,
923 timestamp: 2000,
924 }]
925 );
926 }
927}