1use std::cmp::Ordering;
18use std::collections::BTreeMap;
19use std::hash::{Hash, Hasher};
20
21use api::prom_store::remote::label_matcher::Type as MatcherType;
22use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
23use api::v1::RowInsertRequests;
24use arrow::array::{Array, AsArray};
25use arrow::datatypes::{Float64Type, TimestampMillisecondType};
26use common_grpc::precision::Precision;
27use common_query::prelude::{greptime_timestamp, greptime_value};
28use common_recordbatch::{RecordBatch, RecordBatches};
29use common_telemetry::{tracing, warn};
30use datafusion::dataframe::DataFrame;
31use datafusion::prelude::{Expr, col, lit, regexp_match};
32use datafusion_common::ScalarValue;
33use datafusion_expr::LogicalPlan;
34use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
35use snafu::{OptionExt, ResultExt};
36use snap::raw::{Decoder, Encoder};
37
38use crate::error::{self, Result};
39use crate::row_writer::{self, MultiTableData};
40
41pub const METRIC_NAME_LABEL: &str = "__name__";
42pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
43
44pub const DATABASE_LABEL: &str = "x_greptime_database";
46pub const DATABASE_LABEL_BYTES: &[u8] = b"x_greptime_database";
47pub const DATABASE_LABEL_ALT: &str = "__database__";
48pub const DATABASE_LABEL_ALT_BYTES: &[u8] = b"__database__";
49
50#[deprecated(note = "use DATABASE_LABEL instead")]
52pub const SCHEMA_LABEL: &str = "__schema__";
53#[deprecated(note = "use DATABASE_LABEL_BYTES instead")]
54pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
55
56pub const PHYSICAL_TABLE_LABEL: &str = "x_greptime_physical_table";
58pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"x_greptime_physical_table";
59pub const PHYSICAL_TABLE_LABEL_ALT: &str = "__physical_table__";
60pub const PHYSICAL_TABLE_LABEL_ALT_BYTES: &[u8] = b"__physical_table__";
61
62pub const FIELD_NAME_LABEL: &str = "__field__";
64
65#[allow(deprecated)]
67pub fn is_remote_write_special_label(label: &str) -> bool {
68 label == DATABASE_LABEL
69 || label == DATABASE_LABEL_ALT
70 || label == PHYSICAL_TABLE_LABEL
71 || label == PHYSICAL_TABLE_LABEL_ALT
72 || label == SCHEMA_LABEL
73}
74
75#[allow(deprecated)]
76pub fn is_remote_read_special_label(label: &str) -> bool {
77 label == METRIC_NAME_LABEL
78 || label == DATABASE_LABEL
79 || label == DATABASE_LABEL_ALT
80 || label == SCHEMA_LABEL
81}
82
83#[allow(deprecated)]
85pub fn is_database_selection_label(label: &str) -> bool {
86 label == DATABASE_LABEL || label == DATABASE_LABEL_ALT || label == SCHEMA_LABEL
87}
88
89pub fn is_physical_table_selection_label(label: &str) -> bool {
91 label == PHYSICAL_TABLE_LABEL || label == PHYSICAL_TABLE_LABEL_ALT
92}
93
94pub struct Metrics {
96 pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
97}
98
99pub fn table_name(q: &Query) -> Result<String> {
101 let label_matches = &q.matchers;
102
103 label_matches
104 .iter()
105 .find_map(|m| {
106 if m.name == METRIC_NAME_LABEL {
107 Some(m.value.clone())
108 } else {
109 None
110 }
111 })
112 .context(error::InvalidPromRemoteRequestSnafu {
113 msg: "missing '__name__' label in timeseries",
114 })
115}
116
117pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
119 for query in &request.queries {
120 for matcher in &query.matchers {
121 if is_database_selection_label(&matcher.name)
122 && matcher.r#type == MatcherType::Eq as i32
123 {
124 return Some(matcher.value.clone());
125 }
126 }
127 }
128
129 None
130}
131
132#[tracing::instrument(skip_all)]
134pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
135 let start_timestamp_ms = q.start_timestamp_ms;
136 let end_timestamp_ms = q.end_timestamp_ms;
137
138 let label_matches = &q.matchers;
139
140 let mut conditions = Vec::with_capacity(label_matches.len() + 1);
141
142 conditions.push(col(greptime_timestamp()).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
143 conditions.push(col(greptime_timestamp()).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));
144
145 for m in label_matches {
146 let name = &m.name;
147
148 if is_remote_read_special_label(name) {
149 continue;
150 }
151
152 let value = &m.value;
153 let m_type = MatcherType::try_from(m.r#type).map_err(|e| {
154 error::InvalidPromRemoteRequestSnafu {
155 msg: format!("invalid LabelMatcher type, decode error: {e}",),
156 }
157 .build()
158 })?;
159
160 match m_type {
161 MatcherType::Eq => {
162 conditions.push(col(name).eq(lit(value)));
163 }
164 MatcherType::Neq => {
165 conditions.push(col(name).not_eq(lit(value)));
166 }
167 MatcherType::Re => {
169 conditions.push(regexp_match(col(name), lit(value), None).is_not_null());
170 }
171 MatcherType::Nre => {
173 conditions.push(regexp_match(col(name), lit(value), None).is_null());
174 }
175 }
176 }
177
178 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
180
181 let dataframe = dataframe
182 .filter(conditions)
183 .context(error::DataFrameSnafu)?;
184
185 Ok(dataframe.into_parts().1)
186}
187
188#[inline]
189fn new_label(name: String, value: String) -> Label {
190 Label { name, value }
191}
192
193fn lit_timestamp_millisecond(ts: i64) -> Expr {
194 Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None), None)
195}
196
197#[derive(Debug)]
199struct TimeSeriesId {
200 labels: Vec<Label>,
201}
202
203impl PartialEq for TimeSeriesId {
205 fn eq(&self, other: &Self) -> bool {
206 if self.labels.len() != other.labels.len() {
207 return false;
208 }
209
210 self.labels
211 .iter()
212 .zip(other.labels.iter())
213 .all(|(l, r)| l.name == r.name && l.value == r.value)
214 }
215}
216impl Eq for TimeSeriesId {}
217
218impl Hash for TimeSeriesId {
219 fn hash<H: Hasher>(&self, state: &mut H) {
220 for label in &self.labels {
221 label.name.hash(state);
222 label.value.hash(state);
223 }
224 }
225}
226
227impl Ord for TimeSeriesId {
229 fn cmp(&self, other: &Self) -> Ordering {
230 let ordering = self.labels.len().cmp(&other.labels.len());
231 if ordering != Ordering::Equal {
232 return ordering;
233 }
234
235 for (l, r) in self.labels.iter().zip(other.labels.iter()) {
236 let ordering = l.name.cmp(&r.name);
237
238 if ordering != Ordering::Equal {
239 return ordering;
240 }
241
242 let ordering = l.value.cmp(&r.value);
243
244 if ordering != Ordering::Equal {
245 return ordering;
246 }
247 }
248 Ordering::Equal
249 }
250}
251
252impl PartialOrd for TimeSeriesId {
253 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
254 Some(self.cmp(other))
255 }
256}
257
258fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
261 let row_count = recordbatch.num_rows();
262 let mut timeseries_ids = Vec::with_capacity(row_count);
263
264 let column_names = recordbatch
265 .schema
266 .column_schemas()
267 .iter()
268 .map(|column_schema| &column_schema.name);
269 let columns = column_names
270 .enumerate()
271 .filter(|(_, column_name)| {
272 *column_name != greptime_timestamp() && *column_name != greptime_value()
273 })
274 .map(|(i, column_name)| {
275 (
276 column_name,
277 recordbatch.iter_column_as_string(i).collect::<Vec<_>>(),
278 )
279 })
280 .collect::<Vec<_>>();
281
282 for row in 0..row_count {
283 let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
284 labels.push(new_label(
285 METRIC_NAME_LABEL.to_string(),
286 table_name.to_string(),
287 ));
288
289 for (column_name, column_values) in columns.iter() {
290 if let Some(value) = &column_values[row] {
291 labels.push(new_label((*column_name).clone(), value.clone()));
292 }
293 }
294 timeseries_ids.push(TimeSeriesId { labels });
295 }
296 timeseries_ids
297}
298
299pub fn recordbatches_to_timeseries(
300 table_name: &str,
301 recordbatches: RecordBatches,
302) -> Result<Vec<TimeSeries>> {
303 Ok(recordbatches
304 .take()
305 .into_iter()
306 .map(|x| recordbatch_to_timeseries(table_name, x))
307 .collect::<Result<Vec<_>>>()?
308 .into_iter()
309 .flatten()
310 .collect())
311}
312
313fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
314 let ts_column = recordbatch.column_by_name(greptime_timestamp()).context(
315 error::InvalidPromRemoteReadQueryResultSnafu {
316 msg: "missing greptime_timestamp column in query result",
317 },
318 )?;
319 let ts_column = ts_column
320 .as_primitive_opt::<TimestampMillisecondType>()
321 .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
322 msg: format!(
323 "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
324 ts_column.data_type()
325 ),
326 })?;
327
328 let field_column = recordbatch.column_by_name(greptime_value()).context(
329 error::InvalidPromRemoteReadQueryResultSnafu {
330 msg: "missing greptime_value column in query result",
331 },
332 )?;
333 let field_column = field_column
334 .as_primitive_opt::<Float64Type>()
335 .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
336 msg: format!(
337 "Expect value column of datatype Float64, actual {:?}",
338 field_column.data_type()
339 ),
340 })?;
341
342 let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
344 let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
346
347 for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
348 let timeseries = timeseries_map
349 .entry(timeseries_id)
350 .or_insert_with(|| TimeSeries {
351 labels: timeseries_id.labels.clone(),
352 ..Default::default()
353 });
354
355 if ts_column.is_null(row) || field_column.is_null(row) {
356 continue;
357 }
358
359 let value = field_column.value(row);
360 let timestamp = ts_column.value(row);
361 let sample = Sample { value, timestamp };
362
363 timeseries.samples.push(sample);
364 }
365
366 Ok(timeseries_map.into_values().collect())
367}
368
369pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
370 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
371
372 let mut multi_table_data = MultiTableData::new();
373
374 for series in &request.timeseries {
375 let table_name = &series
376 .labels
377 .iter()
378 .find(|label| {
379 label.name == METRIC_NAME_LABEL
381 })
382 .context(error::InvalidPromRemoteRequestSnafu {
383 msg: "missing '__name__' label in time-series",
384 })?
385 .value;
386
387 let num_columns = series.labels.len() + 1;
390
391 let table_data = multi_table_data.get_or_default_table_data(
392 table_name,
393 num_columns,
394 series.samples.len(),
395 );
396
397 let kvs = series.labels.iter().filter_map(|label| {
399 if label.name == METRIC_NAME_LABEL {
400 None
401 } else {
402 Some((label.name.clone(), label.value.clone()))
403 }
404 });
405
406 if series.samples.len() == 1 {
407 let mut one_row = table_data.alloc_one_row();
408
409 row_writer::write_tags(table_data, kvs, &mut one_row)?;
410 row_writer::write_f64(
412 table_data,
413 greptime_value(),
414 series.samples[0].value,
415 &mut one_row,
416 )?;
417 row_writer::write_ts_to_millis(
419 table_data,
420 greptime_timestamp(),
421 Some(series.samples[0].timestamp),
422 Precision::Millisecond,
423 &mut one_row,
424 )?;
425
426 table_data.add_row(one_row);
427 } else {
428 for Sample { value, timestamp } in &series.samples {
429 let mut one_row = table_data.alloc_one_row();
430
431 let kvs = kvs.clone();
433 row_writer::write_tags(table_data, kvs, &mut one_row)?;
434 row_writer::write_f64(table_data, greptime_value(), *value, &mut one_row)?;
436 row_writer::write_ts_to_millis(
438 table_data,
439 greptime_timestamp(),
440 Some(*timestamp),
441 Precision::Millisecond,
442 &mut one_row,
443 )?;
444
445 table_data.add_row(one_row);
446 }
447 }
448
449 if !series.histograms.is_empty() {
450 warn!("Native histograms are not supported yet, data ignored");
451 }
452 }
453
454 Ok(multi_table_data.into_row_insert_requests())
455}
456
457#[inline]
458pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
459 let mut decoder = Decoder::new();
460 decoder
461 .decompress_vec(buf)
462 .context(error::DecompressSnappyPromRemoteRequestSnafu)
463}
464
465#[inline]
466pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
467 let mut encoder = Encoder::new();
468 encoder
469 .compress_vec(buf)
470 .context(error::CompressPromRemoteRequestSnafu)
471}
472
473#[inline]
474pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
475 zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
476}
477
478pub fn mock_timeseries() -> Vec<TimeSeries> {
481 vec![
482 TimeSeries {
483 labels: vec![
484 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
485 new_label("job".to_string(), "spark".to_string()),
486 ],
487 samples: vec![
488 Sample {
489 value: 1.0f64,
490 timestamp: 1000,
491 },
492 Sample {
493 value: 2.0f64,
494 timestamp: 2000,
495 },
496 ],
497 ..Default::default()
498 },
499 TimeSeries {
500 labels: vec![
501 new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
502 new_label("instance".to_string(), "test_host1".to_string()),
503 new_label("idc".to_string(), "z001".to_string()),
504 ],
505 samples: vec![
506 Sample {
507 value: 3.0f64,
508 timestamp: 1000,
509 },
510 Sample {
511 value: 4.0f64,
512 timestamp: 2000,
513 },
514 ],
515 ..Default::default()
516 },
517 TimeSeries {
518 labels: vec![
519 new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
520 new_label("idc".to_string(), "z002".to_string()),
521 new_label("app".to_string(), "biz".to_string()),
522 ],
523 samples: vec![
524 Sample {
525 value: 5.0f64,
526 timestamp: 1000,
527 },
528 Sample {
529 value: 6.0f64,
530 timestamp: 2000,
531 },
532 Sample {
533 value: 7.0f64,
534 timestamp: 3000,
535 },
536 ],
537 ..Default::default()
538 },
539 ]
540}
541
542pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
544 let ts_demo_metrics = TimeSeries {
545 labels: vec![
546 new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
547 new_label("idc".to_string(), "idc3".to_string()),
548 new_label("new_label1".to_string(), "foo".to_string()),
549 ],
550 samples: vec![Sample {
551 value: 42.0,
552 timestamp: 3000,
553 }],
554 ..Default::default()
555 };
556 let ts_multi_labels = TimeSeries {
557 labels: vec![
558 new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
559 new_label("idc".to_string(), "idc4".to_string()),
560 new_label("env".to_string(), "prod".to_string()),
561 new_label("host".to_string(), "host9".to_string()),
562 new_label("new_label2".to_string(), "bar".to_string()),
563 ],
564 samples: vec![Sample {
565 value: 99.0,
566 timestamp: 4000,
567 }],
568 ..Default::default()
569 };
570
571 vec![ts_demo_metrics, ts_multi_labels]
572}
573
574pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
576 let idc3_schema = TimeSeries {
577 labels: vec![
578 new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
579 new_label(DATABASE_LABEL.to_string(), "idc3".to_string()),
580 new_label(PHYSICAL_TABLE_LABEL.to_string(), "f1".to_string()),
581 ],
582 samples: vec![Sample {
583 value: 42.0,
584 timestamp: 3000,
585 }],
586 ..Default::default()
587 };
588 let idc4_schema = TimeSeries {
589 labels: vec![
590 new_label(
591 METRIC_NAME_LABEL.to_string(),
592 "idc4_local_table".to_string(),
593 ),
594 new_label(DATABASE_LABEL.to_string(), "idc4".to_string()),
595 new_label(PHYSICAL_TABLE_LABEL.to_string(), "f2".to_string()),
596 ],
597 samples: vec![Sample {
598 value: 99.0,
599 timestamp: 4000,
600 }],
601 ..Default::default()
602 };
603
604 vec![idc3_schema, idc4_schema]
605}
606
607#[cfg(test)]
608mod tests {
609 use std::sync::Arc;
610
611 use api::prom_store::remote::LabelMatcher;
612 use api::v1::{ColumnDataType, Row, SemanticType};
613 use datafusion::prelude::SessionContext;
614 use datatypes::data_type::ConcreteDataType;
615 use datatypes::schema::{ColumnSchema, Schema};
616 use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
617 use table::table::adapter::DfTableProviderAdapter;
618 use table::test_util::MemTable;
619
620 use super::*;
621
622 const EQ_TYPE: i32 = MatcherType::Eq as i32;
623 const NEQ_TYPE: i32 = MatcherType::Neq as i32;
624 const RE_TYPE: i32 = MatcherType::Re as i32;
625
626 #[test]
627 fn test_table_name() {
628 let q = Query {
629 start_timestamp_ms: 1000,
630 end_timestamp_ms: 2000,
631 matchers: vec![],
632 ..Default::default()
633 };
634 let err = table_name(&q).unwrap_err();
635 assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
636
637 let q = Query {
638 start_timestamp_ms: 1000,
639 end_timestamp_ms: 2000,
640 matchers: vec![LabelMatcher {
641 name: METRIC_NAME_LABEL.to_string(),
642 value: "test".to_string(),
643 r#type: EQ_TYPE,
644 }],
645 ..Default::default()
646 };
647 assert_eq!("test", table_name(&q).unwrap());
648 }
649
650 #[test]
651 fn test_query_to_plan() {
652 let q = Query {
653 start_timestamp_ms: 1000,
654 end_timestamp_ms: 2000,
655 matchers: vec![LabelMatcher {
656 name: METRIC_NAME_LABEL.to_string(),
657 value: "test".to_string(),
658 r#type: EQ_TYPE,
659 }],
660 ..Default::default()
661 };
662
663 let schema = Arc::new(Schema::new(vec![
664 ColumnSchema::new(
665 greptime_timestamp(),
666 ConcreteDataType::timestamp_millisecond_datatype(),
667 true,
668 ),
669 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
670 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
671 ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
672 ]));
673 let recordbatch = RecordBatch::new(
674 schema,
675 vec![
676 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
677 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
678 Arc::new(StringVector::from(vec!["host1"])) as _,
679 Arc::new(StringVector::from(vec!["job"])) as _,
680 ],
681 )
682 .unwrap();
683
684 let ctx = SessionContext::new();
685 let table = MemTable::table("test", recordbatch);
686 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
687
688 let dataframe = ctx.read_table(table_provider.clone()).unwrap();
689 let plan = query_to_plan(dataframe, &q).unwrap();
690 let display_string = format!("{}", plan.display_indent());
691
692 let ts_col = greptime_timestamp();
693 let expected = format!(
694 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None)\n TableScan: ?table?",
695 ts_col, ts_col
696 );
697 assert_eq!(expected, display_string);
698
699 let q = Query {
700 start_timestamp_ms: 1000,
701 end_timestamp_ms: 2000,
702 matchers: vec![
703 LabelMatcher {
704 name: METRIC_NAME_LABEL.to_string(),
705 value: "test".to_string(),
706 r#type: EQ_TYPE,
707 },
708 LabelMatcher {
709 name: "job".to_string(),
710 value: "*prom*".to_string(),
711 r#type: RE_TYPE,
712 },
713 LabelMatcher {
714 name: "instance".to_string(),
715 value: "localhost".to_string(),
716 r#type: NEQ_TYPE,
717 },
718 ],
719 ..Default::default()
720 };
721
722 let dataframe = ctx.read_table(table_provider).unwrap();
723 let plan = query_to_plan(dataframe, &q).unwrap();
724 let display_string = format!("{}", plan.display_indent());
725
726 let ts_col = greptime_timestamp();
727 let expected = format!(
728 "Filter: ?table?.{} >= TimestampMillisecond(1000, None) AND ?table?.{} <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?",
729 ts_col, ts_col
730 );
731 assert_eq!(expected, display_string);
732 }
733
734 fn column_schemas_with(
735 mut kts_iter: Vec<(&str, ColumnDataType, SemanticType)>,
736 ) -> Vec<api::v1::ColumnSchema> {
737 kts_iter.push((
738 greptime_value(),
739 ColumnDataType::Float64,
740 SemanticType::Field,
741 ));
742 kts_iter.push((
743 greptime_timestamp(),
744 ColumnDataType::TimestampMillisecond,
745 SemanticType::Timestamp,
746 ));
747
748 kts_iter
749 .into_iter()
750 .map(|(k, t, s)| api::v1::ColumnSchema {
751 column_name: k.to_string(),
752 datatype: t as i32,
753 semantic_type: s as i32,
754 ..Default::default()
755 })
756 .collect()
757 }
758
759 fn make_row_with_label(l1: &str, value: f64, timestamp: i64) -> Row {
760 Row {
761 values: vec![
762 api::v1::Value {
763 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
764 },
765 api::v1::Value {
766 value_data: Some(api::v1::value::ValueData::F64Value(value)),
767 },
768 api::v1::Value {
769 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
770 timestamp,
771 )),
772 },
773 ],
774 }
775 }
776
777 fn make_row_with_2_labels(l1: &str, l2: &str, value: f64, timestamp: i64) -> Row {
778 Row {
779 values: vec![
780 api::v1::Value {
781 value_data: Some(api::v1::value::ValueData::StringValue(l1.to_string())),
782 },
783 api::v1::Value {
784 value_data: Some(api::v1::value::ValueData::StringValue(l2.to_string())),
785 },
786 api::v1::Value {
787 value_data: Some(api::v1::value::ValueData::F64Value(value)),
788 },
789 api::v1::Value {
790 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
791 timestamp,
792 )),
793 },
794 ],
795 }
796 }
797
798 #[test]
799 fn test_write_request_to_row_insert_exprs() {
800 let write_request = WriteRequest {
801 timeseries: mock_timeseries(),
802 ..Default::default()
803 };
804
805 let mut exprs = to_grpc_row_insert_requests(&write_request)
806 .unwrap()
807 .0
808 .inserts;
809 exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
810 assert_eq!(3, exprs.len());
811 assert_eq!("metric1", exprs[0].table_name);
812 assert_eq!("metric2", exprs[1].table_name);
813 assert_eq!("metric3", exprs[2].table_name);
814
815 let rows = exprs[0].rows.as_ref().unwrap();
816 let schema = &rows.schema;
817 let rows = &rows.rows;
818 assert_eq!(2, rows.len());
819 assert_eq!(3, schema.len());
820 assert_eq!(
821 column_schemas_with(vec![("job", ColumnDataType::String, SemanticType::Tag)]),
822 *schema
823 );
824 assert_eq!(
825 &vec![
826 make_row_with_label("spark", 1.0, 1000),
827 make_row_with_label("spark", 2.0, 2000),
828 ],
829 rows
830 );
831
832 let rows = exprs[1].rows.as_ref().unwrap();
833 let schema = &rows.schema;
834 let rows = &rows.rows;
835 assert_eq!(2, rows.len());
836 assert_eq!(4, schema.len());
837 assert_eq!(
838 column_schemas_with(vec![
839 ("instance", ColumnDataType::String, SemanticType::Tag),
840 ("idc", ColumnDataType::String, SemanticType::Tag)
841 ]),
842 *schema
843 );
844 assert_eq!(
845 &vec![
846 make_row_with_2_labels("test_host1", "z001", 3.0, 1000),
847 make_row_with_2_labels("test_host1", "z001", 4.0, 2000),
848 ],
849 rows
850 );
851
852 let rows = exprs[2].rows.as_ref().unwrap();
853 let schema = &rows.schema;
854 let rows = &rows.rows;
855 assert_eq!(3, rows.len());
856 assert_eq!(4, schema.len());
857 assert_eq!(
858 column_schemas_with(vec![
859 ("idc", ColumnDataType::String, SemanticType::Tag),
860 ("app", ColumnDataType::String, SemanticType::Tag)
861 ]),
862 *schema
863 );
864 assert_eq!(
865 &vec![
866 make_row_with_2_labels("z002", "biz", 5.0, 1000),
867 make_row_with_2_labels("z002", "biz", 6.0, 2000),
868 make_row_with_2_labels("z002", "biz", 7.0, 3000),
869 ],
870 rows
871 );
872 }
873
874 #[test]
875 fn test_recordbatches_to_timeseries() {
876 let schema = Arc::new(Schema::new(vec![
877 ColumnSchema::new(
878 greptime_timestamp(),
879 ConcreteDataType::timestamp_millisecond_datatype(),
880 true,
881 ),
882 ColumnSchema::new(greptime_value(), ConcreteDataType::float64_datatype(), true),
883 ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
884 ]));
885
886 let recordbatches = RecordBatches::try_new(
887 schema.clone(),
888 vec![
889 RecordBatch::new(
890 schema.clone(),
891 vec![
892 Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
893 Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
894 Arc::new(StringVector::from(vec!["host1"])) as _,
895 ],
896 )
897 .unwrap(),
898 RecordBatch::new(
899 schema,
900 vec![
901 Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
902 Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
903 Arc::new(StringVector::from(vec!["host2"])) as _,
904 ],
905 )
906 .unwrap(),
907 ],
908 )
909 .unwrap();
910
911 let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
912 assert_eq!(2, timeseries.len());
913
914 assert_eq!(
915 vec![
916 Label {
917 name: METRIC_NAME_LABEL.to_string(),
918 value: "metric1".to_string(),
919 },
920 Label {
921 name: "instance".to_string(),
922 value: "host1".to_string(),
923 },
924 ],
925 timeseries[0].labels
926 );
927
928 assert_eq!(
929 timeseries[0].samples,
930 vec![Sample {
931 value: 3.0,
932 timestamp: 1000,
933 }]
934 );
935
936 assert_eq!(
937 vec![
938 Label {
939 name: METRIC_NAME_LABEL.to_string(),
940 value: "metric1".to_string(),
941 },
942 Label {
943 name: "instance".to_string(),
944 value: "host2".to_string(),
945 },
946 ],
947 timeseries[1].labels
948 );
949 assert_eq!(
950 timeseries[1].samples,
951 vec![Sample {
952 value: 7.0,
953 timestamp: 2000,
954 }]
955 );
956 }
957}