servers/http/
loki.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use ahash::{HashMap, HashMapExt};
20use api::v1::value::ValueData;
21use api::v1::{
22    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
23    RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue,
24};
25use axum::extract::State;
26use axum::Extension;
27use axum_extra::TypedHeader;
28use bytes::Bytes;
29use common_query::prelude::GREPTIME_TIMESTAMP;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use headers::ContentType;
33use jsonb::Value;
34use lazy_static::lazy_static;
35use loki_proto::prost_types::Timestamp;
36use prost::Message;
37use quoted_string::test_utils::TestSpec;
38use session::context::{Channel, QueryContext};
39use snafu::{ensure, OptionExt, ResultExt};
40
41use crate::error::{
42    DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu,
43    Result, UnsupportedContentTypeSnafu,
44};
45use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
46use crate::http::extractor::LogTableName;
47use crate::http::result::greptime_result_v1::GreptimedbV1Response;
48use crate::http::HttpResponse;
49use crate::metrics::{
50    METRIC_FAILURE_VALUE, METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED,
51    METRIC_SUCCESS_VALUE,
52};
53use crate::{prom_store, unwrap_or_warn_continue};
54
55const LOKI_TABLE_NAME: &str = "loki_logs";
56const LOKI_LINE_COLUMN: &str = "line";
57const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
58
59const STREAMS_KEY: &str = "streams";
60const LABEL_KEY: &str = "stream";
61const LINES_KEY: &str = "values";
62
63lazy_static! {
64    static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
65        ColumnSchema {
66            column_name: GREPTIME_TIMESTAMP.to_string(),
67            datatype: ColumnDataType::TimestampNanosecond.into(),
68            semantic_type: SemanticType::Timestamp.into(),
69            datatype_extension: None,
70            options: None,
71        },
72        ColumnSchema {
73            column_name: LOKI_LINE_COLUMN.to_string(),
74            datatype: ColumnDataType::String.into(),
75            semantic_type: SemanticType::Field.into(),
76            datatype_extension: None,
77            options: None,
78        },
79        ColumnSchema {
80            column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(),
81            datatype: ColumnDataType::Binary.into(),
82            semantic_type: SemanticType::Field.into(),
83            datatype_extension: Some(ColumnDataTypeExtension {
84                type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType(
85                    JsonTypeExtension::JsonBinary.into()
86                ))
87            }),
88            options: None,
89        }
90    ];
91}
92
93#[axum_macros::debug_handler]
94pub async fn loki_ingest(
95    State(log_state): State<LogState>,
96    Extension(mut ctx): Extension<QueryContext>,
97    TypedHeader(content_type): TypedHeader<ContentType>,
98    LogTableName(table_name): LogTableName,
99    bytes: Bytes,
100) -> Result<HttpResponse> {
101    ctx.set_channel(Channel::Loki);
102    let ctx = Arc::new(ctx);
103    let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
104    let db = ctx.get_db_string();
105    let db_str = db.as_str();
106    let exec_timer = Instant::now();
107
108    // init schemas
109    let mut schemas = LOKI_INIT_SCHEMAS.clone();
110
111    let mut rows = match content_type {
112        x if x == *JSON_CONTENT_TYPE => handle_json_req(bytes, &mut schemas).await,
113        x if x == *PB_CONTENT_TYPE => handle_pb_req(bytes, &mut schemas).await,
114        _ => UnsupportedContentTypeSnafu { content_type }.fail(),
115    }?;
116
117    // fill Null for missing values
118    for row in rows.iter_mut() {
119        row.resize(schemas.len(), GreptimeValue::default());
120    }
121
122    let rows = Rows {
123        rows: rows.into_iter().map(|values| Row { values }).collect(),
124        schema: schemas,
125    };
126    let ins_req = RowInsertRequest {
127        table_name,
128        rows: Some(rows),
129    };
130    let ins_reqs = RowInsertRequests {
131        inserts: vec![ins_req],
132    };
133
134    let handler = log_state.log_handler;
135    let output = handler.insert(ins_reqs, ctx).await;
136
137    if let Ok(Output {
138        data: OutputData::AffectedRows(rows),
139        meta: _,
140    }) = &output
141    {
142        METRIC_LOKI_LOGS_INGESTION_COUNTER
143            .with_label_values(&[db_str])
144            .inc_by(*rows as u64);
145        METRIC_LOKI_LOGS_INGESTION_ELAPSED
146            .with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
147            .observe(exec_timer.elapsed().as_secs_f64());
148    } else {
149        METRIC_LOKI_LOGS_INGESTION_ELAPSED
150            .with_label_values(&[db_str, METRIC_FAILURE_VALUE])
151            .observe(exec_timer.elapsed().as_secs_f64());
152    }
153
154    let response = GreptimedbV1Response::from_output(vec![output])
155        .await
156        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
157    Ok(response)
158}
159
160async fn handle_json_req(
161    bytes: Bytes,
162    schemas: &mut Vec<ColumnSchema>,
163) -> Result<Vec<Vec<GreptimeValue>>> {
164    let mut column_indexer: HashMap<String, u16> = HashMap::new();
165    column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
166    column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);
167
168    let payload: serde_json::Value =
169        serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
170
171    let streams = payload
172        .get(STREAMS_KEY)
173        .context(InvalidLokiPayloadSnafu {
174            msg: "missing streams",
175        })?
176        .as_array()
177        .context(InvalidLokiPayloadSnafu {
178            msg: "streams is not an array",
179        })?;
180
181    let mut rows = Vec::with_capacity(1000);
182
183    for (stream_index, stream) in streams.iter().enumerate() {
184        // parse lines first
185        // do not use `?` in case there are multiple streams
186        let lines = unwrap_or_warn_continue!(
187            stream.get(LINES_KEY),
188            "missing values on stream {}",
189            stream_index
190        );
191        let lines = unwrap_or_warn_continue!(
192            lines.as_array(),
193            "values is not an array on stream {}",
194            stream_index
195        );
196
197        // get labels
198        let labels = stream
199            .get(LABEL_KEY)
200            .and_then(|label| label.as_object())
201            .map(|l| {
202                l.iter()
203                    .filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string())))
204                    .collect::<BTreeMap<String, String>>()
205            });
206
207        // process each line
208        for (line_index, line) in lines.iter().enumerate() {
209            let line = unwrap_or_warn_continue!(
210                line.as_array(),
211                "missing line on stream {} index {}",
212                stream_index,
213                line_index
214            );
215            if line.len() < 2 {
216                warn!(
217                    "line on stream {} index {} is too short",
218                    stream_index, line_index
219                );
220                continue;
221            }
222            // get ts
223            let ts = unwrap_or_warn_continue!(
224                line.first()
225                    .and_then(|ts| ts.as_str())
226                    .and_then(|ts| ts.parse::<i64>().ok()),
227                "missing or invalid timestamp on stream {} index {}",
228                stream_index,
229                line_index
230            );
231            // get line
232            let line_text = unwrap_or_warn_continue!(
233                line.get(1)
234                    .and_then(|line| line.as_str())
235                    .map(|line| line.to_string()),
236                "missing or invalid line on stream {} index {}",
237                stream_index,
238                line_index
239            );
240
241            let structured_metadata = match line.get(2) {
242                Some(sdata) if sdata.is_object() => sdata
243                    .as_object()
244                    .unwrap()
245                    .iter()
246                    .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), Value::String(s.into()))))
247                    .collect(),
248                _ => BTreeMap::new(),
249            };
250            let structured_metadata = Value::Object(structured_metadata);
251
252            let mut row = init_row(schemas.len(), ts, line_text, structured_metadata);
253
254            process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
255
256            rows.push(row);
257        }
258    }
259
260    Ok(rows)
261}
262
263async fn handle_pb_req(
264    bytes: Bytes,
265    schemas: &mut Vec<ColumnSchema>,
266) -> Result<Vec<Vec<GreptimeValue>>> {
267    let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
268    let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
269        .context(DecodeOtlpRequestSnafu)?;
270
271    let mut column_indexer: HashMap<String, u16> = HashMap::new();
272    column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
273    column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);
274
275    let cnt = req.streams.iter().map(|s| s.entries.len()).sum::<usize>();
276    let mut rows = Vec::with_capacity(cnt);
277
278    for stream in req.streams {
279        let labels = parse_loki_labels(&stream.labels)
280            .inspect_err(|e| {
281                error!(e; "failed to parse loki labels");
282            })
283            .ok();
284
285        // process entries
286        for entry in stream.entries {
287            let ts = if let Some(ts) = entry.timestamp {
288                ts
289            } else {
290                continue;
291            };
292            let line = entry.line;
293
294            let structured_metadata = entry
295                .structured_metadata
296                .into_iter()
297                .map(|d| (d.name, Value::String(d.value.into())))
298                .collect::<BTreeMap<String, Value>>();
299            let structured_metadata = Value::Object(structured_metadata);
300
301            let mut row = init_row(
302                schemas.len(),
303                prost_ts_to_nano(&ts),
304                line,
305                structured_metadata,
306            );
307
308            process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
309
310            rows.push(row);
311        }
312    }
313
314    Ok(rows)
315}
316
317/// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label
318/// note: pub here for bench usage
319/// ref:
320/// 1. encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
321/// 2. test data: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
322pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
323    let mut labels = labels.trim();
324    ensure!(
325        labels.len() >= 2,
326        InvalidLokiLabelsSnafu {
327            msg: "labels string too short"
328        }
329    );
330    ensure!(
331        labels.starts_with("{"),
332        InvalidLokiLabelsSnafu {
333            msg: "missing `{` at the beginning"
334        }
335    );
336    ensure!(
337        labels.ends_with("}"),
338        InvalidLokiLabelsSnafu {
339            msg: "missing `}` at the end"
340        }
341    );
342
343    let mut result = BTreeMap::new();
344    labels = &labels[1..labels.len() - 1];
345
346    while !labels.is_empty() {
347        // parse key
348        let first_index = labels.find("=").with_context(|| InvalidLokiLabelsSnafu {
349            msg: format!("missing `=` near: {}", labels),
350        })?;
351        let key = &labels[..first_index];
352        labels = &labels[first_index + 1..];
353
354        // parse value
355        let qs = quoted_string::parse::<TestSpec>(labels)
356            .map_err(|e| {
357                InvalidLokiLabelsSnafu {
358                    msg: format!(
359                        "failed to parse quoted string near: {}, reason: {}",
360                        labels, e.1
361                    ),
362                }
363                .build()
364            })?
365            .quoted_string;
366
367        labels = &labels[qs.len()..];
368
369        let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
370            InvalidLokiLabelsSnafu {
371                msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
372            }
373            .build()
374        })?;
375
376        // insert key and value
377        result.insert(key.to_string(), value.to_string());
378
379        if labels.is_empty() {
380            break;
381        }
382        ensure!(
383            labels.starts_with(","),
384            InvalidLokiLabelsSnafu { msg: "missing `,`" }
385        );
386        labels = labels[1..].trim_start();
387    }
388
389    Ok(result)
390}
391
392#[inline]
393fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
394    ts.seconds * 1_000_000_000 + ts.nanos as i64
395}
396
397fn init_row(
398    schema_len: usize,
399    ts: i64,
400    line: String,
401    structured_metadata: Value,
402) -> Vec<GreptimeValue> {
403    // create and init row
404    let mut row = Vec::with_capacity(schema_len);
405    // set ts and line
406    row.push(GreptimeValue {
407        value_data: Some(ValueData::TimestampNanosecondValue(ts)),
408    });
409    row.push(GreptimeValue {
410        value_data: Some(ValueData::StringValue(line)),
411    });
412    row.push(GreptimeValue {
413        value_data: Some(ValueData::BinaryValue(structured_metadata.to_vec())),
414    });
415    for _ in 0..(schema_len - 3) {
416        row.push(GreptimeValue { value_data: None });
417    }
418    row
419}
420
421fn process_labels(
422    column_indexer: &mut HashMap<String, u16>,
423    schemas: &mut Vec<ColumnSchema>,
424    row: &mut Vec<GreptimeValue>,
425    labels: Option<&BTreeMap<String, String>>,
426) {
427    let Some(labels) = labels else {
428        return;
429    };
430
431    // insert labels
432    for (k, v) in labels {
433        if let Some(index) = column_indexer.get(k) {
434            // exist in schema
435            // insert value using index
436            row[*index as usize] = GreptimeValue {
437                value_data: Some(ValueData::StringValue(v.clone())),
438            };
439        } else {
440            // not exist
441            // add schema and append to values
442            schemas.push(ColumnSchema {
443                column_name: k.clone(),
444                datatype: ColumnDataType::String.into(),
445                semantic_type: SemanticType::Tag.into(),
446                datatype_extension: None,
447                options: None,
448            });
449            column_indexer.insert(k.clone(), (schemas.len() - 1) as u16);
450
451            row.push(GreptimeValue {
452                value_data: Some(ValueData::StringValue(v.clone())),
453            });
454        }
455    }
456}
457
458#[macro_export]
459macro_rules! unwrap_or_warn_continue {
460    ($expr:expr, $msg:expr) => {
461        if let Some(value) = $expr {
462            value
463        } else {
464            warn!($msg);
465            continue;
466        }
467    };
468
469    ($expr:expr, $fmt:expr, $($arg:tt)*) => {
470        if let Some(value) = $expr {
471            value
472        } else {
473            warn!($fmt, $($arg)*);
474            continue;
475        }
476    };
477}
478
479#[cfg(test)]
480mod tests {
481    use std::collections::BTreeMap;
482
483    use loki_proto::prost_types::Timestamp;
484
485    use crate::error::Error::InvalidLokiLabels;
486    use crate::http::loki::{parse_loki_labels, prost_ts_to_nano};
487
488    #[test]
489    fn test_ts_to_nano() {
490        // ts = 1731748568804293888
491        // seconds = 1731748568
492        // nano = 804293888
493        let ts = Timestamp {
494            seconds: 1731748568,
495            nanos: 804293888,
496        };
497        assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
498    }
499
500    #[test]
501    fn test_parse_loki_labels() {
502        let mut expected = BTreeMap::new();
503        expected.insert("job".to_string(), "foobar".to_string());
504        expected.insert("cluster".to_string(), "foo-central1".to_string());
505        expected.insert("namespace".to_string(), "bar".to_string());
506        expected.insert("container_name".to_string(), "buzz".to_string());
507
508        // perfect case
509        let valid_labels =
510            r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
511        let re = parse_loki_labels(valid_labels);
512        assert!(re.is_ok());
513        assert_eq!(re.unwrap(), expected);
514
515        // too short
516        let too_short = r#"}"#;
517        let re = parse_loki_labels(too_short);
518        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
519
520        // missing start
521        let missing_start = r#"job="foobar"}"#;
522        let re = parse_loki_labels(missing_start);
523        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
524
525        // missing start
526        let missing_end = r#"{job="foobar""#;
527        let re = parse_loki_labels(missing_end);
528        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
529
530        // missing equal
531        let missing_equal = r#"{job"foobar"}"#;
532        let re = parse_loki_labels(missing_equal);
533        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
534
535        // missing quote
536        let missing_quote = r#"{job=foobar}"#;
537        let re = parse_loki_labels(missing_quote);
538        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
539
540        // missing comma
541        let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
542        let re = parse_loki_labels(missing_comma);
543        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
544    }
545}