Skip to main content

servers/http/
loki.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::value::ValueData;
20use api::v1::{
21    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22    RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
23};
24use axum::Extension;
25use axum::extract::State;
26use axum_extra::TypedHeader;
27use bytes::Bytes;
28use chrono::DateTime;
29use common_query::prelude::greptime_timestamp;
30use common_telemetry::{error, warn};
31use headers::ContentType;
32use jsonb::Value;
33use lazy_static::lazy_static;
34use loki_proto::logproto::LabelPairAdapter;
35use loki_proto::prost_types::Timestamp as LokiTimestamp;
36use pipeline::util::to_pipeline_version;
37use pipeline::{
38    ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, SchemaInfo,
39};
40use prost::Message;
41use quoted_string::test_utils::TestSpec;
42use session::context::{Channel, QueryContext, QueryContextRef};
43use snafu::{OptionExt, ResultExt, ensure};
44use snap::raw::Decoder;
45use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_LOKI};
46use vrl::value::{KeyString, Value as VrlValue};
47
48use crate::error::{
49    DecodeLokiRequestSnafu, DecompressSnappyLokiRequestSnafu, InvalidLokiLabelsSnafu,
50    InvalidLokiPayloadSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
51};
52use crate::http::HttpResponse;
53use crate::http::event::{
54    JSON_CONTENT_TYPE, LogState, PB_CONTENT_TYPE, PipelineIngestRequest, execute_log_context_req,
55};
56use crate::http::extractor::{LogTableName, PipelineInfo};
57use crate::metrics::{METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED};
58use crate::pipeline::run_pipeline;
59use crate::query_handler::PipelineHandlerRef;
60
61const LOKI_TABLE_NAME: &str = "loki_logs";
62const LOKI_LINE_COLUMN: &str = "line";
63const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
64
65const LOKI_LINE_COLUMN_NAME: &str = "loki_line";
66
67const LOKI_PIPELINE_METADATA_PREFIX: &str = "loki_metadata_";
68const LOKI_PIPELINE_LABEL_PREFIX: &str = "loki_label_";
69
70const STREAMS_KEY: &str = "streams";
71const LABEL_KEY: &str = "stream";
72const LINES_KEY: &str = "values";
73
74lazy_static! {
75    static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
76        ColumnSchema {
77            column_name: greptime_timestamp().to_string(),
78            datatype: ColumnDataType::TimestampNanosecond.into(),
79            semantic_type: SemanticType::Timestamp.into(),
80            datatype_extension: None,
81            options: None,
82        },
83        ColumnSchema {
84            column_name: LOKI_LINE_COLUMN.to_string(),
85            datatype: ColumnDataType::String.into(),
86            semantic_type: SemanticType::Field.into(),
87            datatype_extension: None,
88            options: None,
89        },
90        ColumnSchema {
91            column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(),
92            datatype: ColumnDataType::Binary.into(),
93            semantic_type: SemanticType::Field.into(),
94            datatype_extension: Some(ColumnDataTypeExtension {
95                type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType(
96                    JsonTypeExtension::JsonBinary.into()
97                ))
98            }),
99            options: None,
100        }
101    ];
102}
103
104#[axum_macros::debug_handler]
105pub async fn loki_ingest(
106    State(log_state): State<LogState>,
107    Extension(mut ctx): Extension<QueryContext>,
108    TypedHeader(content_type): TypedHeader<ContentType>,
109    LogTableName(table_name): LogTableName,
110    pipeline_info: PipelineInfo,
111    bytes: Bytes,
112) -> Result<HttpResponse> {
113    ctx.set_channel(Channel::Loki);
114    ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG);
115    ctx.set_extension(SEMANTIC_SOURCE, SOURCE_LOKI);
116    let ctx = Arc::new(ctx);
117    let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
118    let handler = log_state.log_handler;
119    // Preserve the old elapsed metric boundary: it includes parsing, optional
120    // pipeline execution, and insertion.
121    let exec_timer = Instant::now();
122
123    let ctx_req = build_loki_context_req(
124        &handler,
125        content_type,
126        table_name,
127        pipeline_info,
128        bytes,
129        &ctx,
130    )
131    .await?;
132
133    execute_log_context_req(
134        handler,
135        ctx_req,
136        ctx,
137        exec_timer,
138        &METRIC_LOKI_LOGS_INGESTION_COUNTER,
139        &METRIC_LOKI_LOGS_INGESTION_ELAPSED,
140    )
141    .await
142}
143
144/// This is the holder of the loki lines parsed from json or protobuf.
145/// The generic here is either [VrlValue] or [Vec<LabelPairAdapter>].
146/// Depending on the target destination, this can be converted to [LokiRawItem] or [LokiPipeline].
147pub struct LokiMiddleItem<T> {
148    pub ts: i64,
149    pub line: String,
150    pub structured_metadata: Option<T>,
151    pub labels: Option<BTreeMap<String, String>>,
152}
153
154/// This is the line item for the Loki raw ingestion.
155/// We'll persist the line in its whole, set labels into tags,
156/// and structured metadata into a big JSON.
157pub struct LokiRawItem {
158    pub ts: i64,
159    pub line: String,
160    pub structured_metadata: Vec<u8>,
161    pub labels: Option<BTreeMap<String, String>>,
162}
163
164/// This is the line item prepared for the pipeline engine.
165pub struct LokiPipeline {
166    pub map: VrlValue,
167}
168
169struct LokiPipelineContextReq {
170    content_type: ContentType,
171    table_name: String,
172    pipeline_name: String,
173    pipeline_version: Option<String>,
174    pipeline_params: GreptimePipelineParams,
175    bytes: Bytes,
176}
177
178async fn build_loki_context_req(
179    handler: &PipelineHandlerRef,
180    content_type: ContentType,
181    table_name: String,
182    pipeline_info: PipelineInfo,
183    bytes: Bytes,
184    ctx: &QueryContextRef,
185) -> Result<ContextReq> {
186    // A pipeline header switches Loki into the generic pipeline path; without
187    // it, Loki writes directly to the target log table.
188    match pipeline_info.pipeline_name {
189        Some(pipeline_name) => {
190            let pipeline_req = LokiPipelineContextReq {
191                content_type,
192                table_name,
193                pipeline_name,
194                pipeline_version: pipeline_info.pipeline_version,
195                pipeline_params: pipeline_info.pipeline_params,
196                bytes,
197            };
198            build_loki_pipeline_context_req(handler, pipeline_req, ctx).await
199        }
200        None => {
201            let req = build_loki_raw_insert_request(content_type, table_name, bytes)?;
202            Ok(ContextReq::default_opt_with_reqs(vec![req]))
203        }
204    }
205}
206
207async fn build_loki_pipeline_context_req(
208    handler: &PipelineHandlerRef,
209    pipeline_req: LokiPipelineContextReq,
210    ctx: &QueryContextRef,
211) -> Result<ContextReq> {
212    let LokiPipelineContextReq {
213        content_type,
214        table_name,
215        pipeline_name,
216        pipeline_version,
217        pipeline_params,
218        bytes,
219    } = pipeline_req;
220
221    let version = to_pipeline_version(pipeline_version.as_deref()).context(PipelineSnafu)?;
222    let def =
223        PipelineDefinition::from_name(&pipeline_name, version, None).context(PipelineSnafu)?;
224    let pipeline_ctx = PipelineContext::new(&def, &pipeline_params, Channel::Loki);
225
226    let values = extract_item::<LokiPipeline>(content_type, bytes)?
227        .map(|item| item.map)
228        .collect::<Vec<_>>();
229
230    let req = PipelineIngestRequest {
231        table: table_name,
232        values,
233    };
234
235    run_pipeline(handler, &pipeline_ctx, req, ctx, true).await
236}
237
238fn build_loki_raw_insert_request(
239    content_type: ContentType,
240    table_name: String,
241    bytes: Bytes,
242) -> Result<RowInsertRequest> {
243    let mut schema_info = SchemaInfo::from_schema_list(LOKI_INIT_SCHEMAS.clone());
244    let mut rows = Vec::with_capacity(256);
245    for loki_row in extract_item::<LokiRawItem>(content_type, bytes)? {
246        let mut row = init_row(
247            schema_info.schema.len(),
248            loki_row.ts,
249            loki_row.line,
250            loki_row.structured_metadata,
251        );
252        process_labels(&mut schema_info, &mut row, loki_row.labels);
253        rows.push(row);
254    }
255
256    let schemas = schema_info.column_schemas()?;
257    // Labels can introduce new tag columns after earlier rows were built.
258    for row in rows.iter_mut() {
259        row.resize(schemas.len(), GreptimeValue::default());
260    }
261    let rows = Rows {
262        rows: rows.into_iter().map(|values| Row { values }).collect(),
263        schema: schemas,
264    };
265
266    Ok(RowInsertRequest {
267        table_name,
268        rows: Some(rows),
269    })
270}
271
272/// Extract Loki entries from the supported wire format into the caller's
273/// destination type.
274///
275/// JSON push bodies become `LokiMiddleItem<VrlValue>`, protobuf push bodies
276/// become `LokiMiddleItem<Vec<LabelPairAdapter>>`, and the generic `Into<T>`
277/// conversion selects either direct-write `LokiRawItem` or pipeline `LokiPipeline`.
278fn extract_item<T>(content_type: ContentType, bytes: Bytes) -> Result<Box<dyn Iterator<Item = T>>>
279where
280    LokiMiddleItem<VrlValue>: Into<T>,
281    LokiMiddleItem<Vec<LabelPairAdapter>>: Into<T>,
282{
283    match content_type {
284        x if x == *JSON_CONTENT_TYPE => Ok(Box::new(
285            LokiJsonParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
286        )),
287        x if x == *PB_CONTENT_TYPE => Ok(Box::new(
288            LokiPbParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
289        )),
290        _ => UnsupportedContentTypeSnafu { content_type }.fail(),
291    }
292}
293
294struct LokiJsonParser {
295    pub streams: VecDeque<VrlValue>,
296}
297
298impl LokiJsonParser {
299    pub fn from_bytes(bytes: Bytes) -> Result<Self> {
300        let payload: VrlValue = serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
301
302        let VrlValue::Object(mut map) = payload else {
303            return InvalidLokiPayloadSnafu {
304                msg: "payload is not an object",
305            }
306            .fail();
307        };
308
309        let streams = map.remove(STREAMS_KEY).context(InvalidLokiPayloadSnafu {
310            msg: "missing streams",
311        })?;
312
313        let VrlValue::Array(streams) = streams else {
314            return InvalidLokiPayloadSnafu {
315                msg: "streams is not an array",
316            }
317            .fail();
318        };
319
320        Ok(Self {
321            streams: streams.into(),
322        })
323    }
324}
325
326impl Iterator for LokiJsonParser {
327    type Item = JsonStreamItem;
328
329    fn next(&mut self) -> Option<Self::Item> {
330        while let Some(stream) = self.streams.pop_front() {
331            // get lines from the map
332            let VrlValue::Object(mut map) = stream else {
333                warn!("stream is not an object, {:?}", stream);
334                continue;
335            };
336            let Some(lines) = map.remove(LINES_KEY) else {
337                warn!("missing lines on stream, {:?}", map);
338                continue;
339            };
340            let VrlValue::Array(lines) = lines else {
341                warn!("lines is not an array, {:?}", lines);
342                continue;
343            };
344
345            // get labels
346            let labels = map
347                .remove(LABEL_KEY)
348                .and_then(|m| match m {
349                    VrlValue::Object(labels) => Some(labels),
350                    _ => None,
351                })
352                .map(|m| {
353                    m.into_iter()
354                        .filter_map(|(k, v)| match v {
355                            VrlValue::Bytes(v) => {
356                                Some((k.into(), String::from_utf8_lossy(&v).to_string()))
357                            }
358                            _ => None,
359                        })
360                        .collect::<BTreeMap<String, String>>()
361                });
362
363            return Some(JsonStreamItem {
364                lines: lines.into(),
365                labels,
366            });
367        }
368        None
369    }
370}
371
372struct JsonStreamItem {
373    pub lines: VecDeque<VrlValue>,
374    pub labels: Option<BTreeMap<String, String>>,
375}
376
377impl Iterator for JsonStreamItem {
378    type Item = LokiMiddleItem<VrlValue>;
379
380    fn next(&mut self) -> Option<Self::Item> {
381        while let Some(line) = self.lines.pop_front() {
382            let VrlValue::Array(line) = line else {
383                warn!("line is not an array, {:?}", line);
384                continue;
385            };
386            if line.len() < 2 {
387                warn!("line is too short, {:?}", line);
388                continue;
389            }
390            let mut line: VecDeque<VrlValue> = line.into();
391
392            // get ts
393            let ts = line.pop_front().and_then(|ts| match ts {
394                VrlValue::Bytes(ts) => String::from_utf8_lossy(&ts).parse::<i64>().ok(),
395                _ => {
396                    warn!("missing or invalid timestamp, {:?}", ts);
397                    None
398                }
399            });
400            let Some(ts) = ts else {
401                continue;
402            };
403
404            let line_text = line.pop_front().and_then(|l| match l {
405                VrlValue::Bytes(l) => Some(String::from_utf8_lossy(&l).to_string()),
406                _ => {
407                    warn!("missing or invalid line, {:?}", l);
408                    None
409                }
410            });
411            let Some(line_text) = line_text else {
412                continue;
413            };
414
415            let structured_metadata = line.pop_front();
416
417            return Some(LokiMiddleItem {
418                ts,
419                line: line_text,
420                structured_metadata,
421                labels: self.labels.clone(),
422            });
423        }
424        None
425    }
426}
427
428type LokiPipelineMap = BTreeMap<KeyString, VrlValue>;
429
430fn vrl_metadata_to_jsonb(structured_metadata: Option<VrlValue>) -> Vec<u8> {
431    // JSON push structured metadata arrives as a VRL object.
432    let structured_metadata = structured_metadata
433        .and_then(|metadata| match metadata {
434            VrlValue::Object(metadata) => Some(metadata),
435            _ => None,
436        })
437        .map(|metadata| {
438            metadata
439                .into_iter()
440                .filter_map(|(key, value)| match value {
441                    VrlValue::Bytes(bytes) => Some((
442                        key.into(),
443                        Value::String(String::from_utf8_lossy(&bytes).to_string().into()),
444                    )),
445                    _ => None,
446                })
447                .collect::<BTreeMap<String, Value>>()
448        })
449        .unwrap_or_default();
450
451    Value::Object(structured_metadata).to_vec()
452}
453
454fn label_pair_metadata_to_jsonb(structured_metadata: Option<Vec<LabelPairAdapter>>) -> Vec<u8> {
455    // Protobuf push structured metadata arrives as Loki label pairs.
456    let structured_metadata = structured_metadata
457        .unwrap_or_default()
458        .into_iter()
459        .map(|metadata| (metadata.name, Value::String(metadata.value.into())))
460        .collect::<BTreeMap<String, Value>>();
461
462    Value::Object(structured_metadata).to_vec()
463}
464
465fn new_loki_pipeline_map(ts: i64, line: String) -> LokiPipelineMap {
466    let mut map = BTreeMap::new();
467    map.insert(
468        KeyString::from(greptime_timestamp()),
469        VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
470    );
471    map.insert(
472        KeyString::from(LOKI_LINE_COLUMN_NAME),
473        VrlValue::Bytes(line.into()),
474    );
475    map
476}
477
478fn append_vrl_pipeline_metadata(map: &mut LokiPipelineMap, structured_metadata: Option<VrlValue>) {
479    if let Some(VrlValue::Object(metadata)) = structured_metadata {
480        for (key, value) in metadata {
481            map.insert(
482                KeyString::from(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, key)),
483                value,
484            );
485        }
486    }
487}
488
489fn append_label_pair_pipeline_metadata(
490    map: &mut LokiPipelineMap,
491    structured_metadata: Option<Vec<LabelPairAdapter>>,
492) {
493    for metadata in structured_metadata.unwrap_or_default() {
494        map.insert(
495            KeyString::from(format!(
496                "{}{}",
497                LOKI_PIPELINE_METADATA_PREFIX, metadata.name
498            )),
499            VrlValue::Bytes(metadata.value.into()),
500        );
501    }
502}
503
504fn append_pipeline_labels(map: &mut LokiPipelineMap, labels: Option<BTreeMap<String, String>>) {
505    if let Some(labels) = labels {
506        for (key, value) in labels {
507            map.insert(
508                KeyString::from(format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, key)),
509                VrlValue::Bytes(value.into()),
510            );
511        }
512    }
513}
514
515impl From<LokiMiddleItem<VrlValue>> for LokiRawItem {
516    fn from(val: LokiMiddleItem<VrlValue>) -> Self {
517        let LokiMiddleItem {
518            ts,
519            line,
520            structured_metadata,
521            labels,
522        } = val;
523
524        LokiRawItem {
525            ts,
526            line,
527            structured_metadata: vrl_metadata_to_jsonb(structured_metadata),
528            labels,
529        }
530    }
531}
532
533impl From<LokiMiddleItem<VrlValue>> for LokiPipeline {
534    fn from(value: LokiMiddleItem<VrlValue>) -> Self {
535        let LokiMiddleItem {
536            ts,
537            line,
538            structured_metadata,
539            labels,
540        } = value;
541
542        let mut map = new_loki_pipeline_map(ts, line);
543        append_vrl_pipeline_metadata(&mut map, structured_metadata);
544        append_pipeline_labels(&mut map, labels);
545
546        LokiPipeline {
547            map: VrlValue::Object(map),
548        }
549    }
550}
551
552pub struct LokiPbParser {
553    pub streams: VecDeque<loki_proto::logproto::StreamAdapter>,
554}
555
556impl LokiPbParser {
557    pub fn from_bytes(bytes: Bytes) -> Result<Self> {
558        let decompressed = snappy_decompress_loki_request(&bytes)?;
559        let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
560            .context(DecodeLokiRequestSnafu)?;
561
562        Ok(Self {
563            streams: req.streams.into(),
564        })
565    }
566}
567
568fn snappy_decompress_loki_request(buf: &[u8]) -> Result<Vec<u8>> {
569    // Loki's protobuf push body is Snappy-compressed independent of HTTP
570    // content-encoding, so keep this decode step explicit.
571    let mut decoder = Decoder::new();
572    decoder
573        .decompress_vec(buf)
574        .context(DecompressSnappyLokiRequestSnafu)
575}
576
577impl Iterator for LokiPbParser {
578    type Item = PbStreamItem;
579
580    fn next(&mut self) -> Option<Self::Item> {
581        let stream = self.streams.pop_front()?;
582
583        let labels = parse_loki_labels(&stream.labels)
584            .inspect_err(|e| {
585                error!(e; "failed to parse loki labels, {:?}", stream.labels);
586            })
587            .ok();
588
589        Some(PbStreamItem {
590            entries: stream.entries.into(),
591            labels,
592        })
593    }
594}
595
596pub struct PbStreamItem {
597    pub entries: VecDeque<loki_proto::logproto::EntryAdapter>,
598    pub labels: Option<BTreeMap<String, String>>,
599}
600
601impl Iterator for PbStreamItem {
602    type Item = LokiMiddleItem<Vec<LabelPairAdapter>>;
603
604    fn next(&mut self) -> Option<Self::Item> {
605        while let Some(entry) = self.entries.pop_front() {
606            let ts = if let Some(ts) = entry.timestamp {
607                ts
608            } else {
609                warn!("missing timestamp, {:?}", entry);
610                continue;
611            };
612            let line = entry.line;
613
614            let structured_metadata = entry.structured_metadata;
615
616            return Some(LokiMiddleItem {
617                ts: prost_ts_to_nano(&ts),
618                line,
619                structured_metadata: Some(structured_metadata),
620                labels: self.labels.clone(),
621            });
622        }
623        None
624    }
625}
626
627impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiRawItem {
628    fn from(val: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
629        let LokiMiddleItem {
630            ts,
631            line,
632            structured_metadata,
633            labels,
634        } = val;
635
636        LokiRawItem {
637            ts,
638            line,
639            structured_metadata: label_pair_metadata_to_jsonb(structured_metadata),
640            labels,
641        }
642    }
643}
644
645impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiPipeline {
646    fn from(value: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
647        let LokiMiddleItem {
648            ts,
649            line,
650            structured_metadata,
651            labels,
652        } = value;
653
654        let mut map = new_loki_pipeline_map(ts, line);
655        append_label_pair_pipeline_metadata(&mut map, structured_metadata);
656        append_pipeline_labels(&mut map, labels);
657
658        LokiPipeline {
659            map: VrlValue::Object(map),
660        }
661    }
662}
663
664/// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label
665/// note: pub here for bench usage
666/// ref:
667/// 1. encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
668/// 2. test data: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
669pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
670    let mut labels = labels.trim();
671    ensure!(
672        labels.len() >= 2,
673        InvalidLokiLabelsSnafu {
674            msg: "labels string too short"
675        }
676    );
677    ensure!(
678        labels.starts_with("{"),
679        InvalidLokiLabelsSnafu {
680            msg: "missing `{` at the beginning"
681        }
682    );
683    ensure!(
684        labels.ends_with("}"),
685        InvalidLokiLabelsSnafu {
686            msg: "missing `}` at the end"
687        }
688    );
689
690    let mut result = BTreeMap::new();
691    labels = &labels[1..labels.len() - 1];
692
693    while !labels.is_empty() {
694        // parse key
695        let first_index = labels.find("=").with_context(|| InvalidLokiLabelsSnafu {
696            msg: format!("missing `=` near: {}", labels),
697        })?;
698        let key = &labels[..first_index];
699        labels = &labels[first_index + 1..];
700
701        // parse value
702        let qs = quoted_string::parse::<TestSpec>(labels)
703            .map_err(|e| {
704                InvalidLokiLabelsSnafu {
705                    msg: format!(
706                        "failed to parse quoted string near: {}, reason: {}",
707                        labels, e.1
708                    ),
709                }
710                .build()
711            })?
712            .quoted_string;
713
714        labels = &labels[qs.len()..];
715
716        let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
717            InvalidLokiLabelsSnafu {
718                msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
719            }
720            .build()
721        })?;
722
723        // insert key and value
724        result.insert(key.to_string(), value.to_string());
725
726        if labels.is_empty() {
727            break;
728        }
729        ensure!(
730            labels.starts_with(","),
731            InvalidLokiLabelsSnafu { msg: "missing `,`" }
732        );
733        labels = labels[1..].trim_start();
734    }
735
736    Ok(result)
737}
738
739#[inline]
740fn prost_ts_to_nano(ts: &LokiTimestamp) -> i64 {
741    ts.seconds * 1_000_000_000 + ts.nanos as i64
742}
743
744fn init_row(
745    schema_len: usize,
746    ts: i64,
747    line: String,
748    structured_metadata: Vec<u8>,
749) -> Vec<GreptimeValue> {
750    // create and init row
751    let mut row = Vec::with_capacity(schema_len);
752    // set ts and line
753    row.push(GreptimeValue {
754        value_data: Some(ValueData::TimestampNanosecondValue(ts)),
755    });
756    row.push(GreptimeValue {
757        value_data: Some(ValueData::StringValue(line)),
758    });
759    row.push(GreptimeValue {
760        value_data: Some(ValueData::BinaryValue(structured_metadata)),
761    });
762    for _ in 0..(schema_len - 3) {
763        row.push(GreptimeValue { value_data: None });
764    }
765    row
766}
767
768fn process_labels(
769    schema_info: &mut SchemaInfo,
770    row: &mut Vec<GreptimeValue>,
771    labels: Option<BTreeMap<String, String>>,
772) {
773    let Some(labels) = labels else {
774        return;
775    };
776
777    let column_indexer = &mut schema_info.index;
778    let schemas = &mut schema_info.schema;
779
780    // insert labels
781    for (k, v) in labels {
782        if let Some(index) = column_indexer.get(&k) {
783            // exist in schema
784            // insert value using index
785            row[*index] = GreptimeValue {
786                value_data: Some(ValueData::StringValue(v)),
787            };
788        } else {
789            // not exist
790            // add schema and append to values
791            schemas.push(
792                ColumnSchema {
793                    column_name: k.clone(),
794                    datatype: ColumnDataType::String.into(),
795                    semantic_type: SemanticType::Tag.into(),
796                    datatype_extension: None,
797                    options: None,
798                }
799                .into(),
800            );
801            column_indexer.insert(k, schemas.len() - 1);
802
803            row.push(GreptimeValue {
804                value_data: Some(ValueData::StringValue(v)),
805            });
806        }
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use std::collections::BTreeMap;
813
814    use bytes::Bytes;
815    use loki_proto::logproto::{EntryAdapter, PushRequest, StreamAdapter};
816    use loki_proto::prost_types::Timestamp;
817    use prost::Message;
818
819    use super::*;
820    use crate::error::Error::{DecompressSnappyLokiRequest, InvalidLokiLabels};
821    use crate::prom_store::snappy_compress;
822
823    const JSON_PAYLOAD: &[u8] = br#"{
824        "streams": [
825            {
826                "stream": {
827                    "job": "api",
828                    "namespace": "prod"
829                },
830                "values": [
831                    ["1731748568804293888", "line one", {"trace_id": "abc"}]
832                ]
833            },
834            {
835                "stream": {
836                    "job": "worker",
837                    "pod": "worker-0"
838                },
839                "values": [
840                    ["1731748568804293889", "line two"]
841                ]
842            }
843        ]
844    }"#;
845
846    fn row_string_value(row: &Row, index: usize) -> Option<&str> {
847        match row.values[index].value_data.as_ref() {
848            Some(ValueData::StringValue(value)) => Some(value.as_str()),
849            _ => None,
850        }
851    }
852
853    fn pipeline_bytes_value(map: &BTreeMap<KeyString, VrlValue>, key: &str) -> Option<String> {
854        match map.get(&KeyString::from(key))? {
855            VrlValue::Bytes(value) => Some(String::from_utf8_lossy(value.as_ref()).to_string()),
856            _ => None,
857        }
858    }
859
860    #[test]
861    fn test_ts_to_nano() {
862        // ts = 1731748568804293888
863        // seconds = 1731748568
864        // nano = 804293888
865        let ts = Timestamp {
866            seconds: 1731748568,
867            nanos: 804293888,
868        };
869        assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
870    }
871
872    #[test]
873    fn test_json_direct_ingest_builds_schema_and_pads_rows() {
874        let request = build_loki_raw_insert_request(
875            JSON_CONTENT_TYPE.clone(),
876            "custom_loki".to_string(),
877            Bytes::from_static(JSON_PAYLOAD),
878        )
879        .unwrap();
880
881        assert_eq!(request.table_name, "custom_loki");
882        let rows = request.rows.unwrap();
883        let column_names = rows
884            .schema
885            .iter()
886            .map(|schema| schema.column_name.as_str())
887            .collect::<Vec<_>>();
888        assert_eq!(
889            column_names,
890            vec![
891                greptime_timestamp(),
892                LOKI_LINE_COLUMN,
893                LOKI_STRUCTURED_METADATA_COLUMN,
894                "job",
895                "namespace",
896                "pod",
897            ]
898        );
899        assert_eq!(rows.schema[3].semantic_type, SemanticType::Tag as i32);
900        assert_eq!(rows.schema[4].semantic_type, SemanticType::Tag as i32);
901        assert_eq!(rows.schema[5].semantic_type, SemanticType::Tag as i32);
902        assert_eq!(rows.rows.len(), 2);
903
904        let first = &rows.rows[0];
905        assert_eq!(first.values.len(), rows.schema.len());
906        assert_eq!(row_string_value(first, 1), Some("line one"));
907        assert_eq!(row_string_value(first, 3), Some("api"));
908        assert_eq!(row_string_value(first, 4), Some("prod"));
909        assert!(first.values[5].value_data.is_none());
910
911        let second = &rows.rows[1];
912        assert_eq!(second.values.len(), rows.schema.len());
913        assert_eq!(row_string_value(second, 1), Some("line two"));
914        assert_eq!(row_string_value(second, 3), Some("worker"));
915        assert!(second.values[4].value_data.is_none());
916        assert_eq!(row_string_value(second, 5), Some("worker-0"));
917    }
918
919    #[test]
920    fn test_json_pipeline_conversion_names_loki_fields() {
921        let items = extract_item::<LokiPipeline>(
922            JSON_CONTENT_TYPE.clone(),
923            Bytes::from_static(JSON_PAYLOAD),
924        )
925        .unwrap()
926        .collect::<Vec<_>>();
927
928        assert_eq!(items.len(), 2);
929        let VrlValue::Object(map) = &items[0].map else {
930            panic!("expected pipeline object");
931        };
932        assert!(matches!(
933            map.get(&KeyString::from(greptime_timestamp())),
934            Some(VrlValue::Timestamp(_))
935        ));
936        assert_eq!(
937            pipeline_bytes_value(map, LOKI_LINE_COLUMN_NAME),
938            Some("line one".to_string())
939        );
940        assert_eq!(
941            pipeline_bytes_value(map, "loki_label_job"),
942            Some("api".to_string())
943        );
944        assert_eq!(
945            pipeline_bytes_value(map, "loki_label_namespace"),
946            Some("prod".to_string())
947        );
948        assert_eq!(
949            pipeline_bytes_value(map, "loki_metadata_trace_id"),
950            Some("abc".to_string())
951        );
952    }
953
954    #[test]
955    fn test_protobuf_parser_decodes_snappy_push_request() {
956        let request = PushRequest {
957            streams: vec![StreamAdapter {
958                labels: r#"{job="api"}"#.to_string(),
959                entries: vec![EntryAdapter {
960                    timestamp: Some(Timestamp {
961                        seconds: 1731748568,
962                        nanos: 804293888,
963                    }),
964                    line: "line one".to_string(),
965                    structured_metadata: vec![LabelPairAdapter {
966                        name: "trace_id".to_string(),
967                        value: "abc".to_string(),
968                    }],
969                    ..Default::default()
970                }],
971                ..Default::default()
972            }],
973        };
974        let bytes = snappy_compress(&request.encode_to_vec()).unwrap();
975
976        let items = extract_item::<LokiRawItem>(PB_CONTENT_TYPE.clone(), Bytes::from(bytes))
977            .unwrap()
978            .collect::<Vec<_>>();
979
980        assert_eq!(items.len(), 1);
981        assert_eq!(items[0].ts, 1731748568804293888);
982        assert_eq!(items[0].line, "line one");
983        assert_eq!(
984            items[0].labels.as_ref().unwrap().get("job"),
985            Some(&"api".to_string())
986        );
987        assert!(!items[0].structured_metadata.is_empty());
988    }
989
990    #[test]
991    fn test_protobuf_parser_rejects_invalid_snappy_payload() {
992        let err = match LokiPbParser::from_bytes(Bytes::from_static(b"not-snappy")) {
993            Ok(_) => panic!("expected invalid snappy payload to fail"),
994            Err(err) => err,
995        };
996
997        assert!(matches!(err, DecompressSnappyLokiRequest { .. }));
998    }
999
1000    #[test]
1001    fn test_parse_loki_labels() {
1002        let mut expected = BTreeMap::new();
1003        expected.insert("job".to_string(), "foobar".to_string());
1004        expected.insert("cluster".to_string(), "foo-central1".to_string());
1005        expected.insert("namespace".to_string(), "bar".to_string());
1006        expected.insert("container_name".to_string(), "buzz".to_string());
1007
1008        // perfect case
1009        let valid_labels =
1010            r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
1011        let re = parse_loki_labels(valid_labels);
1012        assert!(re.is_ok());
1013        assert_eq!(re.unwrap(), expected);
1014
1015        // too short
1016        let too_short = r#"}"#;
1017        let re = parse_loki_labels(too_short);
1018        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1019
1020        // missing start
1021        let missing_start = r#"job="foobar"}"#;
1022        let re = parse_loki_labels(missing_start);
1023        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1024
1025        // missing start
1026        let missing_end = r#"{job="foobar""#;
1027        let re = parse_loki_labels(missing_end);
1028        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1029
1030        // missing equal
1031        let missing_equal = r#"{job"foobar"}"#;
1032        let re = parse_loki_labels(missing_equal);
1033        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1034
1035        // missing quote
1036        let missing_quote = r#"{job=foobar}"#;
1037        let re = parse_loki_labels(missing_quote);
1038        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1039
1040        // missing comma
1041        let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
1042        let re = parse_loki_labels(missing_comma);
1043        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1044    }
1045}