servers/http/
loki.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::value::ValueData;
20use api::v1::{
21    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22    RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
23};
24use axum::extract::State;
25use axum::Extension;
26use axum_extra::TypedHeader;
27use bytes::Bytes;
28use chrono::DateTime;
29use common_query::prelude::GREPTIME_TIMESTAMP;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use headers::ContentType;
33use jsonb::Value;
34use lazy_static::lazy_static;
35use loki_proto::logproto::LabelPairAdapter;
36use loki_proto::prost_types::Timestamp as LokiTimestamp;
37use pipeline::util::to_pipeline_version;
38use pipeline::{ContextReq, PipelineContext, PipelineDefinition, SchemaInfo};
39use prost::Message;
40use quoted_string::test_utils::TestSpec;
41use session::context::{Channel, QueryContext};
42use snafu::{ensure, OptionExt, ResultExt};
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46    DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu,
47    PipelineSnafu, Result, UnsupportedContentTypeSnafu,
48};
49use crate::http::event::{LogState, PipelineIngestRequest, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
50use crate::http::extractor::{LogTableName, PipelineInfo};
51use crate::http::result::greptime_result_v1::GreptimedbV1Response;
52use crate::http::HttpResponse;
53use crate::metrics::{
54    METRIC_FAILURE_VALUE, METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED,
55    METRIC_SUCCESS_VALUE,
56};
57use crate::pipeline::run_pipeline;
58use crate::prom_store;
59
60const LOKI_TABLE_NAME: &str = "loki_logs";
61const LOKI_LINE_COLUMN: &str = "line";
62const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
63
64const LOKI_LINE_COLUMN_NAME: &str = "loki_line";
65
66const LOKI_PIPELINE_METADATA_PREFIX: &str = "loki_metadata_";
67const LOKI_PIPELINE_LABEL_PREFIX: &str = "loki_label_";
68
69const STREAMS_KEY: &str = "streams";
70const LABEL_KEY: &str = "stream";
71const LINES_KEY: &str = "values";
72
73lazy_static! {
74    static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
75        ColumnSchema {
76            column_name: GREPTIME_TIMESTAMP.to_string(),
77            datatype: ColumnDataType::TimestampNanosecond.into(),
78            semantic_type: SemanticType::Timestamp.into(),
79            datatype_extension: None,
80            options: None,
81        },
82        ColumnSchema {
83            column_name: LOKI_LINE_COLUMN.to_string(),
84            datatype: ColumnDataType::String.into(),
85            semantic_type: SemanticType::Field.into(),
86            datatype_extension: None,
87            options: None,
88        },
89        ColumnSchema {
90            column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(),
91            datatype: ColumnDataType::Binary.into(),
92            semantic_type: SemanticType::Field.into(),
93            datatype_extension: Some(ColumnDataTypeExtension {
94                type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType(
95                    JsonTypeExtension::JsonBinary.into()
96                ))
97            }),
98            options: None,
99        }
100    ];
101}
102
103#[axum_macros::debug_handler]
104pub async fn loki_ingest(
105    State(log_state): State<LogState>,
106    Extension(mut ctx): Extension<QueryContext>,
107    TypedHeader(content_type): TypedHeader<ContentType>,
108    LogTableName(table_name): LogTableName,
109    pipeline_info: PipelineInfo,
110    bytes: Bytes,
111) -> Result<HttpResponse> {
112    ctx.set_channel(Channel::Loki);
113    let ctx = Arc::new(ctx);
114    let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
115    let db = ctx.get_db_string();
116    let db_str = db.as_str();
117    let exec_timer = Instant::now();
118
119    let handler = log_state.log_handler;
120
121    let ctx_req = if let Some(pipeline_name) = pipeline_info.pipeline_name {
122        // go pipeline
123        let version = to_pipeline_version(pipeline_info.pipeline_version.as_deref())
124            .context(PipelineSnafu)?;
125        let def =
126            PipelineDefinition::from_name(&pipeline_name, version, None).context(PipelineSnafu)?;
127        let pipeline_ctx =
128            PipelineContext::new(&def, &pipeline_info.pipeline_params, Channel::Loki);
129
130        let v = extract_item::<LokiPipeline>(content_type, bytes)?
131            .map(|i| i.map)
132            .collect::<Vec<_>>();
133
134        let req = PipelineIngestRequest {
135            table: table_name,
136            values: v,
137        };
138
139        run_pipeline(&handler, &pipeline_ctx, req, &ctx, true).await?
140    } else {
141        // init schemas
142        let mut schema_info = SchemaInfo::from_schema_list(LOKI_INIT_SCHEMAS.clone());
143        let mut rows = Vec::with_capacity(256);
144        for loki_row in extract_item::<LokiRawItem>(content_type, bytes)? {
145            let mut row = init_row(
146                schema_info.schema.len(),
147                loki_row.ts,
148                loki_row.line,
149                loki_row.structured_metadata,
150            );
151            process_labels(&mut schema_info, &mut row, loki_row.labels);
152            rows.push(row);
153        }
154
155        let schemas = schema_info.schema;
156        // fill Null for missing values
157        for row in rows.iter_mut() {
158            row.resize(schemas.len(), GreptimeValue::default());
159        }
160        let rows = Rows {
161            rows: rows.into_iter().map(|values| Row { values }).collect(),
162            schema: schemas,
163        };
164        let ins_req = RowInsertRequest {
165            table_name,
166            rows: Some(rows),
167        };
168
169        ContextReq::default_opt_with_reqs(vec![ins_req])
170    };
171
172    let mut outputs = Vec::with_capacity(ctx_req.map_len());
173    for (temp_ctx, req) in ctx_req.as_req_iter(ctx) {
174        let output = handler.insert(req, temp_ctx).await;
175
176        if let Ok(Output {
177            data: OutputData::AffectedRows(rows),
178            meta: _,
179        }) = &output
180        {
181            METRIC_LOKI_LOGS_INGESTION_COUNTER
182                .with_label_values(&[db_str])
183                .inc_by(*rows as u64);
184            METRIC_LOKI_LOGS_INGESTION_ELAPSED
185                .with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
186                .observe(exec_timer.elapsed().as_secs_f64());
187        } else {
188            METRIC_LOKI_LOGS_INGESTION_ELAPSED
189                .with_label_values(&[db_str, METRIC_FAILURE_VALUE])
190                .observe(exec_timer.elapsed().as_secs_f64());
191        }
192        outputs.push(output);
193    }
194
195    let response = GreptimedbV1Response::from_output(outputs)
196        .await
197        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
198    Ok(response)
199}
200
201/// This is the holder of the loki lines parsed from json or protobuf.
202/// The generic here is either [VrlValue] or [Vec<LabelPairAdapter>].
203/// Depending on the target destination, this can be converted to [LokiRawItem] or [LokiPipeline].
204pub struct LokiMiddleItem<T> {
205    pub ts: i64,
206    pub line: String,
207    pub structured_metadata: Option<T>,
208    pub labels: Option<BTreeMap<String, String>>,
209}
210
211/// This is the line item for the Loki raw ingestion.
212/// We'll persist the line in its whole, set labels into tags,
213/// and structured metadata into a big JSON.
214pub struct LokiRawItem {
215    pub ts: i64,
216    pub line: String,
217    pub structured_metadata: Vec<u8>,
218    pub labels: Option<BTreeMap<String, String>>,
219}
220
221/// This is the line item prepared for the pipeline engine.
222pub struct LokiPipeline {
223    pub map: VrlValue,
224}
225
226/// This is the flow of the Loki ingestion.
227/// +--------+
228/// | bytes  |
229/// +--------+
230///     |
231/// +----------------------+----------------------+
232/// |                      |                      |
233/// |   JSON content type  |   PB content type    |
234/// +----------------------+----------------------+
235/// |                      |                      |
236/// | JsonStreamItem       | PbStreamItem         |
237/// | stream: serde_json   | stream: adapter      |
238/// +----------------------+----------------------+
239/// |                      |                      |
240/// | MiddleItem<serde_json> | MiddleItem<entry>  |
241/// +----------------------+----------------------+
242///           \                  /
243///            \                /
244///             \              /
245///         +----------------------+
246///         |   MiddleItem<T>      |
247///         +----------------------+
248///                 |
249///     +----------------+----------------+
250///     |                                 |
251/// +------------------+         +---------------------+
252/// |   LokiRawItem    |         |  LokiPipelineItem   |
253/// +------------------+         +---------------------+
254///           |                             |
255/// +------------------+         +---------------------+
256/// |   Loki ingest    |         |   run_pipeline      |
257/// +------------------+         +---------------------+
258fn extract_item<T>(content_type: ContentType, bytes: Bytes) -> Result<Box<dyn Iterator<Item = T>>>
259where
260    LokiMiddleItem<VrlValue>: Into<T>,
261    LokiMiddleItem<Vec<LabelPairAdapter>>: Into<T>,
262{
263    match content_type {
264        x if x == *JSON_CONTENT_TYPE => Ok(Box::new(
265            LokiJsonParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
266        )),
267        x if x == *PB_CONTENT_TYPE => Ok(Box::new(
268            LokiPbParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
269        )),
270        _ => UnsupportedContentTypeSnafu { content_type }.fail(),
271    }
272}
273
274struct LokiJsonParser {
275    pub streams: VecDeque<VrlValue>,
276}
277
278impl LokiJsonParser {
279    pub fn from_bytes(bytes: Bytes) -> Result<Self> {
280        let payload: VrlValue = serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
281
282        let VrlValue::Object(mut map) = payload else {
283            return InvalidLokiPayloadSnafu {
284                msg: "payload is not an object",
285            }
286            .fail();
287        };
288
289        let streams = map.remove(STREAMS_KEY).context(InvalidLokiPayloadSnafu {
290            msg: "missing streams",
291        })?;
292
293        let VrlValue::Array(streams) = streams else {
294            return InvalidLokiPayloadSnafu {
295                msg: "streams is not an array",
296            }
297            .fail();
298        };
299
300        Ok(Self {
301            streams: streams.into(),
302        })
303    }
304}
305
306impl Iterator for LokiJsonParser {
307    type Item = JsonStreamItem;
308
309    fn next(&mut self) -> Option<Self::Item> {
310        while let Some(stream) = self.streams.pop_front() {
311            // get lines from the map
312            let VrlValue::Object(mut map) = stream else {
313                warn!("stream is not an object, {:?}", stream);
314                continue;
315            };
316            let Some(lines) = map.remove(LINES_KEY) else {
317                warn!("missing lines on stream, {:?}", map);
318                continue;
319            };
320            let VrlValue::Array(lines) = lines else {
321                warn!("lines is not an array, {:?}", lines);
322                continue;
323            };
324
325            // get labels
326            let labels = map
327                .remove(LABEL_KEY)
328                .and_then(|m| match m {
329                    VrlValue::Object(labels) => Some(labels),
330                    _ => None,
331                })
332                .map(|m| {
333                    m.into_iter()
334                        .filter_map(|(k, v)| match v {
335                            VrlValue::Bytes(v) => {
336                                Some((k.into(), String::from_utf8_lossy(&v).to_string()))
337                            }
338                            _ => None,
339                        })
340                        .collect::<BTreeMap<String, String>>()
341                });
342
343            return Some(JsonStreamItem {
344                lines: lines.into(),
345                labels,
346            });
347        }
348        None
349    }
350}
351
352struct JsonStreamItem {
353    pub lines: VecDeque<VrlValue>,
354    pub labels: Option<BTreeMap<String, String>>,
355}
356
357impl Iterator for JsonStreamItem {
358    type Item = LokiMiddleItem<VrlValue>;
359
360    fn next(&mut self) -> Option<Self::Item> {
361        while let Some(line) = self.lines.pop_front() {
362            let VrlValue::Array(line) = line else {
363                warn!("line is not an array, {:?}", line);
364                continue;
365            };
366            if line.len() < 2 {
367                warn!("line is too short, {:?}", line);
368                continue;
369            }
370            let mut line: VecDeque<VrlValue> = line.into();
371
372            // get ts
373            let ts = line.pop_front().and_then(|ts| match ts {
374                VrlValue::Bytes(ts) => String::from_utf8_lossy(&ts).parse::<i64>().ok(),
375                _ => {
376                    warn!("missing or invalid timestamp, {:?}", ts);
377                    None
378                }
379            });
380            let Some(ts) = ts else {
381                continue;
382            };
383
384            let line_text = line.pop_front().and_then(|l| match l {
385                VrlValue::Bytes(l) => Some(String::from_utf8_lossy(&l).to_string()),
386                _ => {
387                    warn!("missing or invalid line, {:?}", l);
388                    None
389                }
390            });
391            let Some(line_text) = line_text else {
392                continue;
393            };
394
395            let structured_metadata = line.pop_front();
396
397            return Some(LokiMiddleItem {
398                ts,
399                line: line_text,
400                structured_metadata,
401                labels: self.labels.clone(),
402            });
403        }
404        None
405    }
406}
407
408impl From<LokiMiddleItem<VrlValue>> for LokiRawItem {
409    fn from(val: LokiMiddleItem<VrlValue>) -> Self {
410        let LokiMiddleItem {
411            ts,
412            line,
413            structured_metadata,
414            labels,
415        } = val;
416
417        let structured_metadata = structured_metadata
418            .and_then(|m| match m {
419                VrlValue::Object(m) => Some(m),
420                _ => None,
421            })
422            .map(|m| {
423                m.into_iter()
424                    .filter_map(|(k, v)| match v {
425                        VrlValue::Bytes(bytes) => Some((
426                            k.into(),
427                            Value::String(String::from_utf8_lossy(&bytes).to_string().into()),
428                        )),
429                        _ => None,
430                    })
431                    .collect::<BTreeMap<String, Value>>()
432            })
433            .unwrap_or_default();
434        let structured_metadata = Value::Object(structured_metadata).to_vec();
435
436        LokiRawItem {
437            ts,
438            line,
439            structured_metadata,
440            labels,
441        }
442    }
443}
444
445impl From<LokiMiddleItem<VrlValue>> for LokiPipeline {
446    fn from(value: LokiMiddleItem<VrlValue>) -> Self {
447        let LokiMiddleItem {
448            ts,
449            line,
450            structured_metadata,
451            labels,
452        } = value;
453
454        let mut map = BTreeMap::new();
455        map.insert(
456            KeyString::from(GREPTIME_TIMESTAMP),
457            VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
458        );
459        map.insert(
460            KeyString::from(LOKI_LINE_COLUMN_NAME),
461            VrlValue::Bytes(line.into()),
462        );
463
464        if let Some(VrlValue::Object(m)) = structured_metadata {
465            for (k, v) in m {
466                map.insert(
467                    KeyString::from(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, k)),
468                    v,
469                );
470            }
471        }
472        if let Some(v) = labels {
473            v.into_iter().for_each(|(k, v)| {
474                map.insert(
475                    KeyString::from(format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k)),
476                    VrlValue::Bytes(v.into()),
477                );
478            });
479        }
480
481        LokiPipeline {
482            map: VrlValue::Object(map),
483        }
484    }
485}
486
487pub struct LokiPbParser {
488    pub streams: VecDeque<loki_proto::logproto::StreamAdapter>,
489}
490
491impl LokiPbParser {
492    pub fn from_bytes(bytes: Bytes) -> Result<Self> {
493        let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
494        let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
495            .context(DecodeOtlpRequestSnafu)?;
496
497        Ok(Self {
498            streams: req.streams.into(),
499        })
500    }
501}
502
503impl Iterator for LokiPbParser {
504    type Item = PbStreamItem;
505
506    fn next(&mut self) -> Option<Self::Item> {
507        let stream = self.streams.pop_front()?;
508
509        let labels = parse_loki_labels(&stream.labels)
510            .inspect_err(|e| {
511                error!(e; "failed to parse loki labels, {:?}", stream.labels);
512            })
513            .ok();
514
515        Some(PbStreamItem {
516            entries: stream.entries.into(),
517            labels,
518        })
519    }
520}
521
522pub struct PbStreamItem {
523    pub entries: VecDeque<loki_proto::logproto::EntryAdapter>,
524    pub labels: Option<BTreeMap<String, String>>,
525}
526
527impl Iterator for PbStreamItem {
528    type Item = LokiMiddleItem<Vec<LabelPairAdapter>>;
529
530    fn next(&mut self) -> Option<Self::Item> {
531        while let Some(entry) = self.entries.pop_front() {
532            let ts = if let Some(ts) = entry.timestamp {
533                ts
534            } else {
535                warn!("missing timestamp, {:?}", entry);
536                continue;
537            };
538            let line = entry.line;
539
540            let structured_metadata = entry.structured_metadata;
541
542            return Some(LokiMiddleItem {
543                ts: prost_ts_to_nano(&ts),
544                line,
545                structured_metadata: Some(structured_metadata),
546                labels: self.labels.clone(),
547            });
548        }
549        None
550    }
551}
552
553impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiRawItem {
554    fn from(val: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
555        let LokiMiddleItem {
556            ts,
557            line,
558            structured_metadata,
559            labels,
560        } = val;
561
562        let structured_metadata = structured_metadata
563            .unwrap_or_default()
564            .into_iter()
565            .map(|d| (d.name, Value::String(d.value.into())))
566            .collect::<BTreeMap<String, Value>>();
567        let structured_metadata = Value::Object(structured_metadata).to_vec();
568
569        LokiRawItem {
570            ts,
571            line,
572            structured_metadata,
573            labels,
574        }
575    }
576}
577
578impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiPipeline {
579    fn from(value: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
580        let LokiMiddleItem {
581            ts,
582            line,
583            structured_metadata,
584            labels,
585        } = value;
586
587        let mut map = BTreeMap::new();
588        map.insert(
589            KeyString::from(GREPTIME_TIMESTAMP),
590            VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
591        );
592        map.insert(
593            KeyString::from(LOKI_LINE_COLUMN_NAME),
594            VrlValue::Bytes(line.into()),
595        );
596
597        structured_metadata
598            .unwrap_or_default()
599            .into_iter()
600            .for_each(|d| {
601                map.insert(
602                    KeyString::from(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, d.name)),
603                    VrlValue::Bytes(d.value.into()),
604                );
605            });
606
607        if let Some(v) = labels {
608            v.into_iter().for_each(|(k, v)| {
609                map.insert(
610                    KeyString::from(format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k)),
611                    VrlValue::Bytes(v.into()),
612                );
613            });
614        }
615
616        LokiPipeline {
617            map: VrlValue::Object(map),
618        }
619    }
620}
621
622/// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label
623/// note: pub here for bench usage
624/// ref:
625/// 1. encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
626/// 2. test data: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
627pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
628    let mut labels = labels.trim();
629    ensure!(
630        labels.len() >= 2,
631        InvalidLokiLabelsSnafu {
632            msg: "labels string too short"
633        }
634    );
635    ensure!(
636        labels.starts_with("{"),
637        InvalidLokiLabelsSnafu {
638            msg: "missing `{` at the beginning"
639        }
640    );
641    ensure!(
642        labels.ends_with("}"),
643        InvalidLokiLabelsSnafu {
644            msg: "missing `}` at the end"
645        }
646    );
647
648    let mut result = BTreeMap::new();
649    labels = &labels[1..labels.len() - 1];
650
651    while !labels.is_empty() {
652        // parse key
653        let first_index = labels.find("=").with_context(|| InvalidLokiLabelsSnafu {
654            msg: format!("missing `=` near: {}", labels),
655        })?;
656        let key = &labels[..first_index];
657        labels = &labels[first_index + 1..];
658
659        // parse value
660        let qs = quoted_string::parse::<TestSpec>(labels)
661            .map_err(|e| {
662                InvalidLokiLabelsSnafu {
663                    msg: format!(
664                        "failed to parse quoted string near: {}, reason: {}",
665                        labels, e.1
666                    ),
667                }
668                .build()
669            })?
670            .quoted_string;
671
672        labels = &labels[qs.len()..];
673
674        let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
675            InvalidLokiLabelsSnafu {
676                msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
677            }
678            .build()
679        })?;
680
681        // insert key and value
682        result.insert(key.to_string(), value.to_string());
683
684        if labels.is_empty() {
685            break;
686        }
687        ensure!(
688            labels.starts_with(","),
689            InvalidLokiLabelsSnafu { msg: "missing `,`" }
690        );
691        labels = labels[1..].trim_start();
692    }
693
694    Ok(result)
695}
696
697#[inline]
698fn prost_ts_to_nano(ts: &LokiTimestamp) -> i64 {
699    ts.seconds * 1_000_000_000 + ts.nanos as i64
700}
701
702fn init_row(
703    schema_len: usize,
704    ts: i64,
705    line: String,
706    structured_metadata: Vec<u8>,
707) -> Vec<GreptimeValue> {
708    // create and init row
709    let mut row = Vec::with_capacity(schema_len);
710    // set ts and line
711    row.push(GreptimeValue {
712        value_data: Some(ValueData::TimestampNanosecondValue(ts)),
713    });
714    row.push(GreptimeValue {
715        value_data: Some(ValueData::StringValue(line)),
716    });
717    row.push(GreptimeValue {
718        value_data: Some(ValueData::BinaryValue(structured_metadata)),
719    });
720    for _ in 0..(schema_len - 3) {
721        row.push(GreptimeValue { value_data: None });
722    }
723    row
724}
725
726fn process_labels(
727    schema_info: &mut SchemaInfo,
728    row: &mut Vec<GreptimeValue>,
729    labels: Option<BTreeMap<String, String>>,
730) {
731    let Some(labels) = labels else {
732        return;
733    };
734
735    let column_indexer = &mut schema_info.index;
736    let schemas = &mut schema_info.schema;
737
738    // insert labels
739    for (k, v) in labels {
740        if let Some(index) = column_indexer.get(&k) {
741            // exist in schema
742            // insert value using index
743            row[*index] = GreptimeValue {
744                value_data: Some(ValueData::StringValue(v)),
745            };
746        } else {
747            // not exist
748            // add schema and append to values
749            schemas.push(ColumnSchema {
750                column_name: k.clone(),
751                datatype: ColumnDataType::String.into(),
752                semantic_type: SemanticType::Tag.into(),
753                datatype_extension: None,
754                options: None,
755            });
756            column_indexer.insert(k, schemas.len() - 1);
757
758            row.push(GreptimeValue {
759                value_data: Some(ValueData::StringValue(v)),
760            });
761        }
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use std::collections::BTreeMap;
768
769    use loki_proto::prost_types::Timestamp;
770
771    use crate::error::Error::InvalidLokiLabels;
772    use crate::http::loki::{parse_loki_labels, prost_ts_to_nano};
773
774    #[test]
775    fn test_ts_to_nano() {
776        // ts = 1731748568804293888
777        // seconds = 1731748568
778        // nano = 804293888
779        let ts = Timestamp {
780            seconds: 1731748568,
781            nanos: 804293888,
782        };
783        assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
784    }
785
786    #[test]
787    fn test_parse_loki_labels() {
788        let mut expected = BTreeMap::new();
789        expected.insert("job".to_string(), "foobar".to_string());
790        expected.insert("cluster".to_string(), "foo-central1".to_string());
791        expected.insert("namespace".to_string(), "bar".to_string());
792        expected.insert("container_name".to_string(), "buzz".to_string());
793
794        // perfect case
795        let valid_labels =
796            r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
797        let re = parse_loki_labels(valid_labels);
798        assert!(re.is_ok());
799        assert_eq!(re.unwrap(), expected);
800
801        // too short
802        let too_short = r#"}"#;
803        let re = parse_loki_labels(too_short);
804        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
805
806        // missing start
807        let missing_start = r#"job="foobar"}"#;
808        let re = parse_loki_labels(missing_start);
809        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
810
811        // missing start
812        let missing_end = r#"{job="foobar""#;
813        let re = parse_loki_labels(missing_end);
814        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
815
816        // missing equal
817        let missing_equal = r#"{job"foobar"}"#;
818        let re = parse_loki_labels(missing_equal);
819        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
820
821        // missing quote
822        let missing_quote = r#"{job=foobar}"#;
823        let re = parse_loki_labels(missing_quote);
824        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
825
826        // missing comma
827        let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
828        let re = parse_loki_labels(missing_comma);
829        assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
830    }
831}