1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17
18#[cfg(test)]
19use api::greptime_proto::io::prometheus::write::v2::{Exemplar, Histogram, Metadata, metadata};
20use api::greptime_proto::io::prometheus::write::v2::{Request, Sample, TimeSeries};
21use api::v1::{RowInsertRequest, Rows, Value};
22use bytes::Bytes;
23use common_grpc::precision::Precision;
24use common_query::prelude::{greptime_timestamp, greptime_value};
25use pipeline::{ContextOpt, ContextReq};
26use prost::Message;
27use snafu::{OptionExt, ResultExt, ensure};
28
29use crate::error::{self, Result};
30use crate::prom_remote_write::row_builder::PromCtx;
31use crate::prom_remote_write::try_decompress;
32#[allow(deprecated)]
33use crate::prom_store::{
34 DATABASE_LABEL, DATABASE_LABEL_ALT, METRIC_NAME_LABEL, PHYSICAL_TABLE_LABEL,
35 PHYSICAL_TABLE_LABEL_ALT, SCHEMA_LABEL,
36};
37use crate::row_writer::{self, TableData};
38
39type PromTags = Vec<(String, String)>;
40type ResolvedSeriesLabels = (PromCtx, String, PromTags);
41
42pub(crate) fn decode_remote_write_v2_request(is_zstd: bool, body: Bytes) -> Result<Request> {
43 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
44
45 let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
48 buf
49 } else {
50 try_decompress(!is_zstd, &body[..])?
51 };
52
53 Request::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
54}
55
56pub(crate) trait RemoteWriteV2RequestExt {
57 fn into_context_req(self) -> Result<ContextReq>;
58}
59
60impl RemoteWriteV2RequestExt for Request {
61 fn into_context_req(self) -> Result<ContextReq> {
62 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
63 let Request {
64 symbols,
65 timeseries,
66 } = self;
67
68 ensure!(
69 symbols.first().map(|s| s.as_str()) == Some(""),
70 error::InvalidPromRemoteRequestSnafu {
71 msg: "remote write v2 symbols must start with an empty string".to_string(),
72 }
73 );
74
75 let mut tables = HashMap::<PromCtx, HashMap<String, TableData>>::new();
76
77 for series in timeseries {
78 let sample_count = series.samples.len();
81 if sample_count == 0 {
82 continue;
83 }
84
85 let (prom_ctx, table_name, tags) = resolve_series_labels(&symbols, &series)?;
86 let table_data = match tables.entry(prom_ctx).or_default().entry(table_name) {
87 Entry::Occupied(entry) => {
88 let table_data = entry.into_mut();
89 table_data.reserve_rows(sample_count);
90 table_data
91 }
92 Entry::Vacant(entry) => entry.insert(TableData::new(tags.len() + 2, sample_count)),
93 };
94
95 write_samples(table_data, series.samples, tags)?;
96 }
97
98 Ok(into_context_req(tables))
99 }
100}
101
102fn write_samples(
103 table_data: &mut TableData,
104 mut samples: Vec<Sample>,
105 tags: PromTags,
106) -> Result<()> {
107 let Some(last_sample) = samples.pop() else {
108 return Ok(());
109 };
110
111 for sample in &samples {
112 write_sample(table_data, sample, tags.iter().cloned())?;
113 }
114
115 write_sample(table_data, &last_sample, tags.into_iter())
116}
117
118fn write_sample(
119 table_data: &mut TableData,
120 sample: &Sample,
121 tags: impl Iterator<Item = (String, String)>,
122) -> Result<()> {
123 let mut row = table_data.alloc_one_row();
124 row_writer::write_ts_to_millis(
125 table_data,
126 greptime_timestamp(),
127 Some(sample.timestamp),
128 Precision::Millisecond,
129 &mut row,
130 )?;
131 row_writer::write_f64(table_data, greptime_value(), sample.value, &mut row)?;
132 row_writer::write_tags(table_data, tags, &mut row)?;
133 table_data.add_row(row);
134
135 Ok(())
136}
137
138fn resolve_series_labels(symbols: &[String], series: &TimeSeries) -> Result<ResolvedSeriesLabels> {
139 ensure!(
140 series.labels_refs.len().is_multiple_of(2),
141 error::InvalidPromRemoteRequestSnafu {
142 msg: "remote write v2 labels_refs must contain name/value pairs".to_string(),
143 }
144 );
145
146 let mut prom_ctx = PromCtx::default();
147 let mut table_name = None;
148 let mut tags = Vec::with_capacity(series.labels_refs.len() / 2);
149 let mut label_names = HashSet::with_capacity(series.labels_refs.len() / 2);
150
151 for pair in series.labels_refs.chunks_exact(2) {
152 let name = symbol_ref(symbols, pair[0], "label name")?;
153 let value = symbol_ref(symbols, pair[1], "label value")?;
154 validate_label(name, value)?;
155 ensure!(
156 label_names.insert(name),
157 error::InvalidPromRemoteRequestSnafu {
158 msg: format!("remote write v2 label name `{name}` is repeated"),
159 }
160 );
161
162 if name == METRIC_NAME_LABEL {
163 table_name = Some(value.to_string());
164 continue;
165 }
166 if apply_remote_write_special_label(name, value, &mut prom_ctx) {
167 continue;
168 }
169
170 tags.push((name.to_string(), value.to_string()));
171 }
172
173 let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu {
174 msg: "missing '__name__' label in time-series".to_string(),
175 })?;
176
177 Ok((prom_ctx, table_name, tags))
178}
179
180fn validate_label(name: &str, value: &str) -> Result<()> {
181 ensure!(
182 !name.is_empty(),
183 error::InvalidPromRemoteRequestSnafu {
184 msg: "remote write v2 label names must not be empty".to_string(),
185 }
186 );
187 ensure!(
188 !value.is_empty(),
189 error::InvalidPromRemoteRequestSnafu {
190 msg: format!("remote write v2 label `{name}` value must not be empty"),
191 }
192 );
193
194 Ok(())
195}
196
197fn symbol_ref<'a>(symbols: &'a [String], idx: u32, field: &str) -> Result<&'a str> {
198 symbols
199 .get(idx as usize)
200 .map(String::as_str)
201 .with_context(|| error::InvalidPromRemoteRequestSnafu {
202 msg: format!(
203 "remote write v2 {field} symbol reference {idx} is out of range, symbols len: {}",
204 symbols.len()
205 ),
206 })
207}
208
209#[allow(deprecated)]
210fn apply_remote_write_special_label(name: &str, value: &str, prom_ctx: &mut PromCtx) -> bool {
211 match name {
212 SCHEMA_LABEL => {
213 prom_ctx.schema = Some(value.to_string());
214 true
215 }
216 DATABASE_LABEL | DATABASE_LABEL_ALT => {
217 if prom_ctx.schema.is_none() {
218 prom_ctx.schema = Some(value.to_string());
219 }
220 true
221 }
222 PHYSICAL_TABLE_LABEL | PHYSICAL_TABLE_LABEL_ALT => {
223 prom_ctx.physical_table = Some(value.to_string());
224 true
225 }
226 _ => false,
227 }
228}
229
230fn into_context_req(tables: HashMap<PromCtx, HashMap<String, TableData>>) -> ContextReq {
231 let mut ctx_req = ContextReq::default();
232 for (prom_ctx, tables) in tables {
233 let mut opt = ContextOpt::default();
234 if let Some(schema) = prom_ctx.schema {
235 opt.set_schema(schema);
236 }
237 if let Some(physical_table) = prom_ctx.physical_table {
238 opt.set_physical_table(physical_table);
239 }
240
241 ctx_req.add_rows(
242 opt,
243 tables.into_iter().map(|(table_name, table_data)| {
244 table_data_to_row_insert_request(table_name, table_data)
245 }),
246 );
247 }
248 ctx_req
249}
250
251fn table_data_to_row_insert_request(table_name: String, table_data: TableData) -> RowInsertRequest {
252 let num_columns = table_data.num_columns();
253 let (schema, mut rows) = table_data.into_schema_and_rows();
254 for row in &mut rows {
255 if num_columns > row.values.len() {
256 row.values.resize(num_columns, Value { value_data: None });
257 }
258 }
259
260 RowInsertRequest {
261 table_name,
262 rows: Some(Rows { schema, rows }),
263 }
264}
265
266#[cfg(any(test, feature = "testing"))]
267pub mod test_util {
268 use api::greptime_proto::io::prometheus::write::v2::{Histogram, Request, Sample, TimeSeries};
269
270 pub fn request_with_labels_and_samples(
271 labels: Vec<(&str, &str)>,
272 samples: Vec<Sample>,
273 ) -> Request {
274 request_with_labels(labels, samples, Vec::new())
275 }
276
277 pub fn request_with_labels_and_histograms(
278 labels: Vec<(&str, &str)>,
279 histograms: Vec<Histogram>,
280 ) -> Request {
281 request_with_labels(labels, Vec::new(), histograms)
282 }
283
284 pub fn histogram(timestamp: i64) -> Histogram {
285 Histogram {
286 timestamp,
287 ..Default::default()
288 }
289 }
290
291 fn request_with_labels(
292 labels: Vec<(&str, &str)>,
293 samples: Vec<Sample>,
294 histograms: Vec<Histogram>,
295 ) -> Request {
296 let mut symbols = vec!["".to_string()];
297 let mut labels_refs = Vec::with_capacity(labels.len() * 2);
298 for (name, value) in labels {
299 labels_refs.push(push_symbol(&mut symbols, name));
300 labels_refs.push(push_symbol(&mut symbols, value));
301 }
302
303 Request {
304 symbols,
305 timeseries: vec![TimeSeries {
306 labels_refs,
307 samples,
308 histograms,
309 exemplars: Vec::new(),
310 metadata: None,
311 }],
312 }
313 }
314
315 fn push_symbol(symbols: &mut Vec<String>, symbol: &str) -> u32 {
316 if let Some(idx) = symbols.iter().position(|s| s == symbol) {
317 return idx as u32;
318 }
319
320 let idx = symbols.len();
321 symbols.push(symbol.to_string());
322 idx as u32
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use std::sync::Arc;
329
330 use api::v1::value::ValueData;
331 use common_query::prelude::{greptime_timestamp, greptime_value};
332 use session::context::QueryContext;
333
334 use super::*;
335 use crate::error;
336 use crate::http::prom_store::PHYSICAL_TABLE_PARAM;
337 use crate::prom_store::{DATABASE_LABEL, PHYSICAL_TABLE_LABEL};
338
339 #[test]
340 fn test_decode_remote_write_v2_request() {
341 let request = Request {
342 symbols: vec![
343 "".to_string(),
344 "__name__".to_string(),
345 "http_requests_total".to_string(),
346 ],
347 timeseries: vec![TimeSeries {
348 labels_refs: vec![1, 2],
349 samples: vec![Sample {
350 value: 42.0,
351 timestamp: 1000,
352 start_timestamp: 0,
353 }],
354 histograms: Vec::new(),
355 exemplars: Vec::new(),
356 metadata: Some(Metadata {
357 r#type: metadata::MetricType::Counter as i32,
358 help_ref: 0,
359 unit_ref: 0,
360 }),
361 }],
362 };
363 let body =
364 Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap());
365
366 let decoded = decode_remote_write_v2_request(false, body).unwrap();
367
368 assert_eq!(decoded.symbols, request.symbols);
369 assert_eq!(decoded.timeseries.len(), 1);
370 assert_eq!(decoded.timeseries[0].labels_refs, vec![1, 2]);
371 assert_eq!(decoded.timeseries[0].samples.len(), 1);
372 assert_eq!(decoded.timeseries[0].samples[0].value, 42.0);
373 assert_eq!(decoded.timeseries[0].metadata.as_ref().unwrap().r#type, 1);
374 }
375
376 #[test]
377 fn test_into_context_req_samples() {
378 let ctx_req = test_util::request_with_labels_and_samples(
379 vec![
380 (METRIC_NAME_LABEL, "http_requests_total"),
381 ("job", "api"),
382 ("instance", "localhost:9090"),
383 ],
384 vec![
385 Sample {
386 value: 42.0,
387 timestamp: 1000,
388 start_timestamp: 0,
389 },
390 Sample {
391 value: 43.0,
392 timestamp: 2000,
393 start_timestamp: 0,
394 },
395 ],
396 )
397 .into_context_req()
398 .unwrap();
399
400 let mut inserts = ctx_req.all_req().collect::<Vec<_>>();
401 assert_eq!(inserts.len(), 1);
402
403 let request = inserts.pop().unwrap();
404 assert_eq!(request.table_name, "http_requests_total");
405 let rows = request.rows.unwrap();
406 assert_eq!(rows.rows.len(), 2);
407 assert_eq!(
408 rows.schema
409 .iter()
410 .map(|col| col.column_name.as_str())
411 .collect::<Vec<_>>(),
412 vec![greptime_timestamp(), greptime_value(), "job", "instance"]
413 );
414 assert_eq!(
415 rows.rows[0].values[0].value_data,
416 Some(ValueData::TimestampMillisecondValue(1000))
417 );
418 assert_eq!(
419 rows.rows[0].values[1].value_data,
420 Some(ValueData::F64Value(42.0))
421 );
422 assert_eq!(
423 rows.rows[0].values[2].value_data,
424 Some(ValueData::StringValue("api".to_string()))
425 );
426 assert_eq!(
427 rows.rows[0].values[3].value_data,
428 Some(ValueData::StringValue("localhost:9090".to_string()))
429 );
430 assert_eq!(
431 rows.rows[1].values[0].value_data,
432 Some(ValueData::TimestampMillisecondValue(2000))
433 );
434 assert_eq!(
435 rows.rows[1].values[1].value_data,
436 Some(ValueData::F64Value(43.0))
437 );
438 }
439
440 #[test]
441 fn test_into_context_req_accepts_utf8_label_names() {
442 let ctx_req = test_util::request_with_labels_and_samples(
443 vec![
444 (METRIC_NAME_LABEL, "http_requests_total"),
445 ("service.name", "api"),
446 ("区域", "华东"),
447 ],
448 vec![Sample {
449 value: 42.0,
450 timestamp: 1000,
451 start_timestamp: 0,
452 }],
453 )
454 .into_context_req()
455 .unwrap();
456
457 let mut inserts = ctx_req.all_req().collect::<Vec<_>>();
458 assert_eq!(inserts.len(), 1);
459 let rows = inserts.pop().unwrap().rows.unwrap();
460 assert_eq!(
461 rows.schema
462 .iter()
463 .map(|col| col.column_name.as_str())
464 .collect::<Vec<_>>(),
465 vec![
466 greptime_timestamp(),
467 greptime_value(),
468 "service.name",
469 "区域"
470 ]
471 );
472 assert_eq!(
473 rows.rows[0].values[2].value_data,
474 Some(ValueData::StringValue("api".to_string()))
475 );
476 assert_eq!(
477 rows.rows[0].values[3].value_data,
478 Some(ValueData::StringValue("华东".to_string()))
479 );
480 }
481
482 #[test]
483 fn test_into_context_req_special_labels() {
484 let ctx_req = test_util::request_with_labels_and_samples(
485 vec![
486 (METRIC_NAME_LABEL, "cpu_usage"),
487 (DATABASE_LABEL, "tenant_a"),
488 (PHYSICAL_TABLE_LABEL, "metrics_physical"),
489 ("job", "api"),
490 ],
491 vec![Sample {
492 value: 1.0,
493 timestamp: 1000,
494 start_timestamp: 0,
495 }],
496 )
497 .into_context_req()
498 .unwrap();
499
500 let mut iter = ctx_req.as_req_iter(Arc::new(QueryContext::with("greptime", "public")));
501 let (ctx, reqs) = iter.next().unwrap();
502 assert!(iter.next().is_none());
503
504 assert_eq!(ctx.current_schema(), "tenant_a");
505 assert_eq!(
506 ctx.extension(PHYSICAL_TABLE_PARAM),
507 Some("metrics_physical")
508 );
509 assert_eq!(reqs.inserts.len(), 1);
510
511 let rows = reqs.inserts[0].rows.as_ref().unwrap();
512 assert_eq!(
513 rows.schema
514 .iter()
515 .map(|col| col.column_name.as_str())
516 .collect::<Vec<_>>(),
517 vec![greptime_timestamp(), greptime_value(), "job"]
518 );
519 }
520
521 #[test]
522 fn test_into_context_req_rejects_invalid_requests() {
523 let mut cases = Vec::new();
524
525 cases.push((
526 "missing metric name",
527 request_with_sample(vec![("job", "api")]),
528 "missing '__name__'",
529 ));
530
531 let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
532 request.timeseries[0].labels_refs.push(1);
533 cases.push((
534 "odd label refs",
535 request,
536 "labels_refs must contain name/value pairs",
537 ));
538
539 let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
540 request.timeseries[0].labels_refs[1] = 99;
541 cases.push((
542 "out of range symbol ref",
543 request,
544 "symbol reference 99 is out of range",
545 ));
546
547 let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
548 request.symbols[0] = "not-empty".to_string();
549 cases.push((
550 "non-empty first symbol",
551 request,
552 "symbols must start with an empty string",
553 ));
554
555 cases.push((
556 "repeated label name",
557 request_with_sample(vec![
558 (METRIC_NAME_LABEL, "metric"),
559 ("job", "api"),
560 ("job", "worker"),
561 ]),
562 "label name `job` is repeated",
563 ));
564
565 cases.push((
566 "empty label name",
567 request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("", "api")]),
568 "label names must not be empty",
569 ));
570
571 cases.push((
572 "empty label value",
573 request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("job", "")]),
574 "label `job` value must not be empty",
575 ));
576
577 for (name, request, expected) in cases {
578 assert_invalid(name, request, expected);
579 }
580 }
581
582 #[test]
583 fn test_into_context_req_ignores_histograms_and_exemplars() {
584 let mut request = test_util::request_with_labels_and_samples(
585 vec![(METRIC_NAME_LABEL, "metric")],
586 vec![Sample {
587 value: 1.0,
588 timestamp: 1000,
589 start_timestamp: 0,
590 }],
591 );
592 request.timeseries[0].histograms.push(Histogram::default());
593 request.timeseries[0].exemplars.push(Exemplar::default());
594
595 let ctx_req = request.into_context_req().unwrap();
596
597 assert_eq!(ctx_req.all_req().count(), 1);
598 }
599
600 #[test]
601 fn test_into_context_req_skips_histogram_only_series() {
602 let mut request =
603 test_util::request_with_labels_and_samples(vec![(METRIC_NAME_LABEL, "metric")], vec![]);
604 request.timeseries[0].histograms.push(Histogram::default());
605
606 let ctx_req = request.into_context_req().unwrap();
607
608 assert_eq!(ctx_req.all_req().count(), 0);
609 }
610
611 fn request_with_sample(labels: Vec<(&str, &str)>) -> Request {
612 test_util::request_with_labels_and_samples(
613 labels,
614 vec![Sample {
615 value: 1.0,
616 timestamp: 1000,
617 start_timestamp: 0,
618 }],
619 )
620 }
621
622 fn assert_invalid(name: &str, request: Request, expected: &str) {
623 let err = request.into_context_req().unwrap_err();
624 assert!(
625 matches!(err, error::Error::InvalidPromRemoteRequest { .. }),
626 "{name}: expected invalid request error, got {err}"
627 );
628 assert!(
629 err.to_string().contains(expected),
630 "{name}: expected error containing {expected:?}, got {err}"
631 );
632 }
633}