Skip to main content

servers/prom_remote_write/
v2.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17
18#[cfg(test)]
19use api::greptime_proto::io::prometheus::write::v2::{Exemplar, Histogram, Metadata, metadata};
20use api::greptime_proto::io::prometheus::write::v2::{Request, Sample, TimeSeries};
21use api::v1::{RowInsertRequest, Rows, Value};
22use bytes::Bytes;
23use common_grpc::precision::Precision;
24use common_query::prelude::{greptime_timestamp, greptime_value};
25use pipeline::{ContextOpt, ContextReq};
26use prost::Message;
27use snafu::{OptionExt, ResultExt, ensure};
28
29use crate::error::{self, Result};
30use crate::prom_remote_write::row_builder::PromCtx;
31use crate::prom_remote_write::try_decompress;
32#[allow(deprecated)]
33use crate::prom_store::{
34    DATABASE_LABEL, DATABASE_LABEL_ALT, METRIC_NAME_LABEL, PHYSICAL_TABLE_LABEL,
35    PHYSICAL_TABLE_LABEL_ALT, SCHEMA_LABEL,
36};
37use crate::row_writer::{self, TableData};
38
39type PromTags = Vec<(String, String)>;
40type ResolvedSeriesLabels = (PromCtx, String, PromTags);
41
42pub(crate) fn decode_remote_write_v2_request(is_zstd: bool, body: Bytes) -> Result<Request> {
43    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
44
45    // Match the v1 decoder's VictoriaMetrics fallback: some clients may send a
46    // mismatched content-encoding header, so try the other compression on failure.
47    let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
48        buf
49    } else {
50        try_decompress(!is_zstd, &body[..])?
51    };
52
53    Request::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
54}
55
56pub(crate) trait RemoteWriteV2RequestExt {
57    fn into_context_req(self) -> Result<ContextReq>;
58}
59
60impl RemoteWriteV2RequestExt for Request {
61    fn into_context_req(self) -> Result<ContextReq> {
62        let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
63        let Request {
64            symbols,
65            timeseries,
66        } = self;
67
68        ensure!(
69            symbols.first().map(|s| s.as_str()) == Some(""),
70            error::InvalidPromRemoteRequestSnafu {
71                msg: "remote write v2 symbols must start with an empty string".to_string(),
72            }
73        );
74
75        let mut tables = HashMap::<PromCtx, HashMap<String, TableData>>::new();
76
77        for series in timeseries {
78            // Native histograms and exemplars are intentionally ignored for now.
79            // They will be converted into Greptime rows once their ingestion path is implemented.
80            let sample_count = series.samples.len();
81            if sample_count == 0 {
82                continue;
83            }
84
85            let (prom_ctx, table_name, tags) = resolve_series_labels(&symbols, &series)?;
86            let table_data = match tables.entry(prom_ctx).or_default().entry(table_name) {
87                Entry::Occupied(entry) => {
88                    let table_data = entry.into_mut();
89                    table_data.reserve_rows(sample_count);
90                    table_data
91                }
92                Entry::Vacant(entry) => entry.insert(TableData::new(tags.len() + 2, sample_count)),
93            };
94
95            write_samples(table_data, series.samples, tags)?;
96        }
97
98        Ok(into_context_req(tables))
99    }
100}
101
102fn write_samples(
103    table_data: &mut TableData,
104    mut samples: Vec<Sample>,
105    tags: PromTags,
106) -> Result<()> {
107    let Some(last_sample) = samples.pop() else {
108        return Ok(());
109    };
110
111    for sample in &samples {
112        write_sample(table_data, sample, tags.iter().cloned())?;
113    }
114
115    write_sample(table_data, &last_sample, tags.into_iter())
116}
117
118fn write_sample(
119    table_data: &mut TableData,
120    sample: &Sample,
121    tags: impl Iterator<Item = (String, String)>,
122) -> Result<()> {
123    let mut row = table_data.alloc_one_row();
124    row_writer::write_ts_to_millis(
125        table_data,
126        greptime_timestamp(),
127        Some(sample.timestamp),
128        Precision::Millisecond,
129        &mut row,
130    )?;
131    row_writer::write_f64(table_data, greptime_value(), sample.value, &mut row)?;
132    row_writer::write_tags(table_data, tags, &mut row)?;
133    table_data.add_row(row);
134
135    Ok(())
136}
137
138fn resolve_series_labels(symbols: &[String], series: &TimeSeries) -> Result<ResolvedSeriesLabels> {
139    ensure!(
140        series.labels_refs.len().is_multiple_of(2),
141        error::InvalidPromRemoteRequestSnafu {
142            msg: "remote write v2 labels_refs must contain name/value pairs".to_string(),
143        }
144    );
145
146    let mut prom_ctx = PromCtx::default();
147    let mut table_name = None;
148    let mut tags = Vec::with_capacity(series.labels_refs.len() / 2);
149    let mut label_names = HashSet::with_capacity(series.labels_refs.len() / 2);
150
151    for pair in series.labels_refs.chunks_exact(2) {
152        let name = symbol_ref(symbols, pair[0], "label name")?;
153        let value = symbol_ref(symbols, pair[1], "label value")?;
154        validate_label(name, value)?;
155        ensure!(
156            label_names.insert(name),
157            error::InvalidPromRemoteRequestSnafu {
158                msg: format!("remote write v2 label name `{name}` is repeated"),
159            }
160        );
161
162        if name == METRIC_NAME_LABEL {
163            table_name = Some(value.to_string());
164            continue;
165        }
166        if apply_remote_write_special_label(name, value, &mut prom_ctx) {
167            continue;
168        }
169
170        tags.push((name.to_string(), value.to_string()));
171    }
172
173    let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu {
174        msg: "missing '__name__' label in time-series".to_string(),
175    })?;
176
177    Ok((prom_ctx, table_name, tags))
178}
179
180fn validate_label(name: &str, value: &str) -> Result<()> {
181    ensure!(
182        !name.is_empty(),
183        error::InvalidPromRemoteRequestSnafu {
184            msg: "remote write v2 label names must not be empty".to_string(),
185        }
186    );
187    ensure!(
188        !value.is_empty(),
189        error::InvalidPromRemoteRequestSnafu {
190            msg: format!("remote write v2 label `{name}` value must not be empty"),
191        }
192    );
193
194    Ok(())
195}
196
197fn symbol_ref<'a>(symbols: &'a [String], idx: u32, field: &str) -> Result<&'a str> {
198    symbols
199        .get(idx as usize)
200        .map(String::as_str)
201        .with_context(|| error::InvalidPromRemoteRequestSnafu {
202            msg: format!(
203                "remote write v2 {field} symbol reference {idx} is out of range, symbols len: {}",
204                symbols.len()
205            ),
206        })
207}
208
209#[allow(deprecated)]
210fn apply_remote_write_special_label(name: &str, value: &str, prom_ctx: &mut PromCtx) -> bool {
211    match name {
212        SCHEMA_LABEL => {
213            prom_ctx.schema = Some(value.to_string());
214            true
215        }
216        DATABASE_LABEL | DATABASE_LABEL_ALT => {
217            if prom_ctx.schema.is_none() {
218                prom_ctx.schema = Some(value.to_string());
219            }
220            true
221        }
222        PHYSICAL_TABLE_LABEL | PHYSICAL_TABLE_LABEL_ALT => {
223            prom_ctx.physical_table = Some(value.to_string());
224            true
225        }
226        _ => false,
227    }
228}
229
230fn into_context_req(tables: HashMap<PromCtx, HashMap<String, TableData>>) -> ContextReq {
231    let mut ctx_req = ContextReq::default();
232    for (prom_ctx, tables) in tables {
233        let mut opt = ContextOpt::default();
234        if let Some(schema) = prom_ctx.schema {
235            opt.set_schema(schema);
236        }
237        if let Some(physical_table) = prom_ctx.physical_table {
238            opt.set_physical_table(physical_table);
239        }
240
241        ctx_req.add_rows(
242            opt,
243            tables.into_iter().map(|(table_name, table_data)| {
244                table_data_to_row_insert_request(table_name, table_data)
245            }),
246        );
247    }
248    ctx_req
249}
250
251fn table_data_to_row_insert_request(table_name: String, table_data: TableData) -> RowInsertRequest {
252    let num_columns = table_data.num_columns();
253    let (schema, mut rows) = table_data.into_schema_and_rows();
254    for row in &mut rows {
255        if num_columns > row.values.len() {
256            row.values.resize(num_columns, Value { value_data: None });
257        }
258    }
259
260    RowInsertRequest {
261        table_name,
262        rows: Some(Rows { schema, rows }),
263    }
264}
265
266#[cfg(any(test, feature = "testing"))]
267pub mod test_util {
268    use api::greptime_proto::io::prometheus::write::v2::{Histogram, Request, Sample, TimeSeries};
269
270    pub fn request_with_labels_and_samples(
271        labels: Vec<(&str, &str)>,
272        samples: Vec<Sample>,
273    ) -> Request {
274        request_with_labels(labels, samples, Vec::new())
275    }
276
277    pub fn request_with_labels_and_histograms(
278        labels: Vec<(&str, &str)>,
279        histograms: Vec<Histogram>,
280    ) -> Request {
281        request_with_labels(labels, Vec::new(), histograms)
282    }
283
284    pub fn histogram(timestamp: i64) -> Histogram {
285        Histogram {
286            timestamp,
287            ..Default::default()
288        }
289    }
290
291    fn request_with_labels(
292        labels: Vec<(&str, &str)>,
293        samples: Vec<Sample>,
294        histograms: Vec<Histogram>,
295    ) -> Request {
296        let mut symbols = vec!["".to_string()];
297        let mut labels_refs = Vec::with_capacity(labels.len() * 2);
298        for (name, value) in labels {
299            labels_refs.push(push_symbol(&mut symbols, name));
300            labels_refs.push(push_symbol(&mut symbols, value));
301        }
302
303        Request {
304            symbols,
305            timeseries: vec![TimeSeries {
306                labels_refs,
307                samples,
308                histograms,
309                exemplars: Vec::new(),
310                metadata: None,
311            }],
312        }
313    }
314
315    fn push_symbol(symbols: &mut Vec<String>, symbol: &str) -> u32 {
316        if let Some(idx) = symbols.iter().position(|s| s == symbol) {
317            return idx as u32;
318        }
319
320        let idx = symbols.len();
321        symbols.push(symbol.to_string());
322        idx as u32
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::sync::Arc;
329
330    use api::v1::value::ValueData;
331    use common_query::prelude::{greptime_timestamp, greptime_value};
332    use session::context::QueryContext;
333
334    use super::*;
335    use crate::error;
336    use crate::http::prom_store::PHYSICAL_TABLE_PARAM;
337    use crate::prom_store::{DATABASE_LABEL, PHYSICAL_TABLE_LABEL};
338
339    #[test]
340    fn test_decode_remote_write_v2_request() {
341        let request = Request {
342            symbols: vec![
343                "".to_string(),
344                "__name__".to_string(),
345                "http_requests_total".to_string(),
346            ],
347            timeseries: vec![TimeSeries {
348                labels_refs: vec![1, 2],
349                samples: vec![Sample {
350                    value: 42.0,
351                    timestamp: 1000,
352                    start_timestamp: 0,
353                }],
354                histograms: Vec::new(),
355                exemplars: Vec::new(),
356                metadata: Some(Metadata {
357                    r#type: metadata::MetricType::Counter as i32,
358                    help_ref: 0,
359                    unit_ref: 0,
360                }),
361            }],
362        };
363        let body =
364            Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap());
365
366        let decoded = decode_remote_write_v2_request(false, body).unwrap();
367
368        assert_eq!(decoded.symbols, request.symbols);
369        assert_eq!(decoded.timeseries.len(), 1);
370        assert_eq!(decoded.timeseries[0].labels_refs, vec![1, 2]);
371        assert_eq!(decoded.timeseries[0].samples.len(), 1);
372        assert_eq!(decoded.timeseries[0].samples[0].value, 42.0);
373        assert_eq!(decoded.timeseries[0].metadata.as_ref().unwrap().r#type, 1);
374    }
375
376    #[test]
377    fn test_into_context_req_samples() {
378        let ctx_req = test_util::request_with_labels_and_samples(
379            vec![
380                (METRIC_NAME_LABEL, "http_requests_total"),
381                ("job", "api"),
382                ("instance", "localhost:9090"),
383            ],
384            vec![
385                Sample {
386                    value: 42.0,
387                    timestamp: 1000,
388                    start_timestamp: 0,
389                },
390                Sample {
391                    value: 43.0,
392                    timestamp: 2000,
393                    start_timestamp: 0,
394                },
395            ],
396        )
397        .into_context_req()
398        .unwrap();
399
400        let mut inserts = ctx_req.all_req().collect::<Vec<_>>();
401        assert_eq!(inserts.len(), 1);
402
403        let request = inserts.pop().unwrap();
404        assert_eq!(request.table_name, "http_requests_total");
405        let rows = request.rows.unwrap();
406        assert_eq!(rows.rows.len(), 2);
407        assert_eq!(
408            rows.schema
409                .iter()
410                .map(|col| col.column_name.as_str())
411                .collect::<Vec<_>>(),
412            vec![greptime_timestamp(), greptime_value(), "job", "instance"]
413        );
414        assert_eq!(
415            rows.rows[0].values[0].value_data,
416            Some(ValueData::TimestampMillisecondValue(1000))
417        );
418        assert_eq!(
419            rows.rows[0].values[1].value_data,
420            Some(ValueData::F64Value(42.0))
421        );
422        assert_eq!(
423            rows.rows[0].values[2].value_data,
424            Some(ValueData::StringValue("api".to_string()))
425        );
426        assert_eq!(
427            rows.rows[0].values[3].value_data,
428            Some(ValueData::StringValue("localhost:9090".to_string()))
429        );
430        assert_eq!(
431            rows.rows[1].values[0].value_data,
432            Some(ValueData::TimestampMillisecondValue(2000))
433        );
434        assert_eq!(
435            rows.rows[1].values[1].value_data,
436            Some(ValueData::F64Value(43.0))
437        );
438    }
439
440    #[test]
441    fn test_into_context_req_accepts_utf8_label_names() {
442        let ctx_req = test_util::request_with_labels_and_samples(
443            vec![
444                (METRIC_NAME_LABEL, "http_requests_total"),
445                ("service.name", "api"),
446                ("区域", "华东"),
447            ],
448            vec![Sample {
449                value: 42.0,
450                timestamp: 1000,
451                start_timestamp: 0,
452            }],
453        )
454        .into_context_req()
455        .unwrap();
456
457        let mut inserts = ctx_req.all_req().collect::<Vec<_>>();
458        assert_eq!(inserts.len(), 1);
459        let rows = inserts.pop().unwrap().rows.unwrap();
460        assert_eq!(
461            rows.schema
462                .iter()
463                .map(|col| col.column_name.as_str())
464                .collect::<Vec<_>>(),
465            vec![
466                greptime_timestamp(),
467                greptime_value(),
468                "service.name",
469                "区域"
470            ]
471        );
472        assert_eq!(
473            rows.rows[0].values[2].value_data,
474            Some(ValueData::StringValue("api".to_string()))
475        );
476        assert_eq!(
477            rows.rows[0].values[3].value_data,
478            Some(ValueData::StringValue("华东".to_string()))
479        );
480    }
481
482    #[test]
483    fn test_into_context_req_special_labels() {
484        let ctx_req = test_util::request_with_labels_and_samples(
485            vec![
486                (METRIC_NAME_LABEL, "cpu_usage"),
487                (DATABASE_LABEL, "tenant_a"),
488                (PHYSICAL_TABLE_LABEL, "metrics_physical"),
489                ("job", "api"),
490            ],
491            vec![Sample {
492                value: 1.0,
493                timestamp: 1000,
494                start_timestamp: 0,
495            }],
496        )
497        .into_context_req()
498        .unwrap();
499
500        let mut iter = ctx_req.as_req_iter(Arc::new(QueryContext::with("greptime", "public")));
501        let (ctx, reqs) = iter.next().unwrap();
502        assert!(iter.next().is_none());
503
504        assert_eq!(ctx.current_schema(), "tenant_a");
505        assert_eq!(
506            ctx.extension(PHYSICAL_TABLE_PARAM),
507            Some("metrics_physical")
508        );
509        assert_eq!(reqs.inserts.len(), 1);
510
511        let rows = reqs.inserts[0].rows.as_ref().unwrap();
512        assert_eq!(
513            rows.schema
514                .iter()
515                .map(|col| col.column_name.as_str())
516                .collect::<Vec<_>>(),
517            vec![greptime_timestamp(), greptime_value(), "job"]
518        );
519    }
520
521    #[test]
522    fn test_into_context_req_rejects_invalid_requests() {
523        let mut cases = Vec::new();
524
525        cases.push((
526            "missing metric name",
527            request_with_sample(vec![("job", "api")]),
528            "missing '__name__'",
529        ));
530
531        let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
532        request.timeseries[0].labels_refs.push(1);
533        cases.push((
534            "odd label refs",
535            request,
536            "labels_refs must contain name/value pairs",
537        ));
538
539        let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
540        request.timeseries[0].labels_refs[1] = 99;
541        cases.push((
542            "out of range symbol ref",
543            request,
544            "symbol reference 99 is out of range",
545        ));
546
547        let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
548        request.symbols[0] = "not-empty".to_string();
549        cases.push((
550            "non-empty first symbol",
551            request,
552            "symbols must start with an empty string",
553        ));
554
555        cases.push((
556            "repeated label name",
557            request_with_sample(vec![
558                (METRIC_NAME_LABEL, "metric"),
559                ("job", "api"),
560                ("job", "worker"),
561            ]),
562            "label name `job` is repeated",
563        ));
564
565        cases.push((
566            "empty label name",
567            request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("", "api")]),
568            "label names must not be empty",
569        ));
570
571        cases.push((
572            "empty label value",
573            request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("job", "")]),
574            "label `job` value must not be empty",
575        ));
576
577        for (name, request, expected) in cases {
578            assert_invalid(name, request, expected);
579        }
580    }
581
582    #[test]
583    fn test_into_context_req_ignores_histograms_and_exemplars() {
584        let mut request = test_util::request_with_labels_and_samples(
585            vec![(METRIC_NAME_LABEL, "metric")],
586            vec![Sample {
587                value: 1.0,
588                timestamp: 1000,
589                start_timestamp: 0,
590            }],
591        );
592        request.timeseries[0].histograms.push(Histogram::default());
593        request.timeseries[0].exemplars.push(Exemplar::default());
594
595        let ctx_req = request.into_context_req().unwrap();
596
597        assert_eq!(ctx_req.all_req().count(), 1);
598    }
599
600    #[test]
601    fn test_into_context_req_skips_histogram_only_series() {
602        let mut request =
603            test_util::request_with_labels_and_samples(vec![(METRIC_NAME_LABEL, "metric")], vec![]);
604        request.timeseries[0].histograms.push(Histogram::default());
605
606        let ctx_req = request.into_context_req().unwrap();
607
608        assert_eq!(ctx_req.all_req().count(), 0);
609    }
610
611    fn request_with_sample(labels: Vec<(&str, &str)>) -> Request {
612        test_util::request_with_labels_and_samples(
613            labels,
614            vec![Sample {
615                value: 1.0,
616                timestamp: 1000,
617                start_timestamp: 0,
618            }],
619        )
620    }
621
622    fn assert_invalid(name: &str, request: Request, expected: &str) {
623        let err = request.into_context_req().unwrap_err();
624        assert!(
625            matches!(err, error::Error::InvalidPromRemoteRequest { .. }),
626            "{name}: expected invalid request error, got {err}"
627        );
628        assert!(
629            err.to_string().contains(expected),
630            "{name}: expected error containing {expected:?}, got {err}"
631        );
632    }
633}