1use std::collections::{BTreeMap, VecDeque};
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::value::ValueData;
20use api::v1::{
21 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22 RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
23};
24use axum::extract::State;
25use axum::Extension;
26use axum_extra::TypedHeader;
27use bytes::Bytes;
28use chrono::DateTime;
29use common_query::prelude::GREPTIME_TIMESTAMP;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use headers::ContentType;
33use jsonb::Value;
34use lazy_static::lazy_static;
35use loki_proto::logproto::LabelPairAdapter;
36use loki_proto::prost_types::Timestamp as LokiTimestamp;
37use pipeline::util::to_pipeline_version;
38use pipeline::{ContextReq, PipelineContext, PipelineDefinition, SchemaInfo};
39use prost::Message;
40use quoted_string::test_utils::TestSpec;
41use session::context::{Channel, QueryContext};
42use snafu::{ensure, OptionExt, ResultExt};
43use vrl::value::{KeyString, Value as VrlValue};
44
45use crate::error::{
46 DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu,
47 PipelineSnafu, Result, UnsupportedContentTypeSnafu,
48};
49use crate::http::event::{LogState, PipelineIngestRequest, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
50use crate::http::extractor::{LogTableName, PipelineInfo};
51use crate::http::result::greptime_result_v1::GreptimedbV1Response;
52use crate::http::HttpResponse;
53use crate::metrics::{
54 METRIC_FAILURE_VALUE, METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED,
55 METRIC_SUCCESS_VALUE,
56};
57use crate::pipeline::run_pipeline;
58use crate::prom_store;
59
60const LOKI_TABLE_NAME: &str = "loki_logs";
61const LOKI_LINE_COLUMN: &str = "line";
62const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
63
64const LOKI_LINE_COLUMN_NAME: &str = "loki_line";
65
66const LOKI_PIPELINE_METADATA_PREFIX: &str = "loki_metadata_";
67const LOKI_PIPELINE_LABEL_PREFIX: &str = "loki_label_";
68
69const STREAMS_KEY: &str = "streams";
70const LABEL_KEY: &str = "stream";
71const LINES_KEY: &str = "values";
72
73lazy_static! {
74 static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
75 ColumnSchema {
76 column_name: GREPTIME_TIMESTAMP.to_string(),
77 datatype: ColumnDataType::TimestampNanosecond.into(),
78 semantic_type: SemanticType::Timestamp.into(),
79 datatype_extension: None,
80 options: None,
81 },
82 ColumnSchema {
83 column_name: LOKI_LINE_COLUMN.to_string(),
84 datatype: ColumnDataType::String.into(),
85 semantic_type: SemanticType::Field.into(),
86 datatype_extension: None,
87 options: None,
88 },
89 ColumnSchema {
90 column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(),
91 datatype: ColumnDataType::Binary.into(),
92 semantic_type: SemanticType::Field.into(),
93 datatype_extension: Some(ColumnDataTypeExtension {
94 type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType(
95 JsonTypeExtension::JsonBinary.into()
96 ))
97 }),
98 options: None,
99 }
100 ];
101}
102
103#[axum_macros::debug_handler]
104pub async fn loki_ingest(
105 State(log_state): State<LogState>,
106 Extension(mut ctx): Extension<QueryContext>,
107 TypedHeader(content_type): TypedHeader<ContentType>,
108 LogTableName(table_name): LogTableName,
109 pipeline_info: PipelineInfo,
110 bytes: Bytes,
111) -> Result<HttpResponse> {
112 ctx.set_channel(Channel::Loki);
113 let ctx = Arc::new(ctx);
114 let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
115 let db = ctx.get_db_string();
116 let db_str = db.as_str();
117 let exec_timer = Instant::now();
118
119 let handler = log_state.log_handler;
120
121 let ctx_req = if let Some(pipeline_name) = pipeline_info.pipeline_name {
122 let version = to_pipeline_version(pipeline_info.pipeline_version.as_deref())
124 .context(PipelineSnafu)?;
125 let def =
126 PipelineDefinition::from_name(&pipeline_name, version, None).context(PipelineSnafu)?;
127 let pipeline_ctx =
128 PipelineContext::new(&def, &pipeline_info.pipeline_params, Channel::Loki);
129
130 let v = extract_item::<LokiPipeline>(content_type, bytes)?
131 .map(|i| i.map)
132 .collect::<Vec<_>>();
133
134 let req = PipelineIngestRequest {
135 table: table_name,
136 values: v,
137 };
138
139 run_pipeline(&handler, &pipeline_ctx, req, &ctx, true).await?
140 } else {
141 let mut schema_info = SchemaInfo::from_schema_list(LOKI_INIT_SCHEMAS.clone());
143 let mut rows = Vec::with_capacity(256);
144 for loki_row in extract_item::<LokiRawItem>(content_type, bytes)? {
145 let mut row = init_row(
146 schema_info.schema.len(),
147 loki_row.ts,
148 loki_row.line,
149 loki_row.structured_metadata,
150 );
151 process_labels(&mut schema_info, &mut row, loki_row.labels);
152 rows.push(row);
153 }
154
155 let schemas = schema_info.schema;
156 for row in rows.iter_mut() {
158 row.resize(schemas.len(), GreptimeValue::default());
159 }
160 let rows = Rows {
161 rows: rows.into_iter().map(|values| Row { values }).collect(),
162 schema: schemas,
163 };
164 let ins_req = RowInsertRequest {
165 table_name,
166 rows: Some(rows),
167 };
168
169 ContextReq::default_opt_with_reqs(vec![ins_req])
170 };
171
172 let mut outputs = Vec::with_capacity(ctx_req.map_len());
173 for (temp_ctx, req) in ctx_req.as_req_iter(ctx) {
174 let output = handler.insert(req, temp_ctx).await;
175
176 if let Ok(Output {
177 data: OutputData::AffectedRows(rows),
178 meta: _,
179 }) = &output
180 {
181 METRIC_LOKI_LOGS_INGESTION_COUNTER
182 .with_label_values(&[db_str])
183 .inc_by(*rows as u64);
184 METRIC_LOKI_LOGS_INGESTION_ELAPSED
185 .with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
186 .observe(exec_timer.elapsed().as_secs_f64());
187 } else {
188 METRIC_LOKI_LOGS_INGESTION_ELAPSED
189 .with_label_values(&[db_str, METRIC_FAILURE_VALUE])
190 .observe(exec_timer.elapsed().as_secs_f64());
191 }
192 outputs.push(output);
193 }
194
195 let response = GreptimedbV1Response::from_output(outputs)
196 .await
197 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
198 Ok(response)
199}
200
201pub struct LokiMiddleItem<T> {
205 pub ts: i64,
206 pub line: String,
207 pub structured_metadata: Option<T>,
208 pub labels: Option<BTreeMap<String, String>>,
209}
210
211pub struct LokiRawItem {
215 pub ts: i64,
216 pub line: String,
217 pub structured_metadata: Vec<u8>,
218 pub labels: Option<BTreeMap<String, String>>,
219}
220
221pub struct LokiPipeline {
223 pub map: VrlValue,
224}
225
226fn extract_item<T>(content_type: ContentType, bytes: Bytes) -> Result<Box<dyn Iterator<Item = T>>>
259where
260 LokiMiddleItem<VrlValue>: Into<T>,
261 LokiMiddleItem<Vec<LabelPairAdapter>>: Into<T>,
262{
263 match content_type {
264 x if x == *JSON_CONTENT_TYPE => Ok(Box::new(
265 LokiJsonParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
266 )),
267 x if x == *PB_CONTENT_TYPE => Ok(Box::new(
268 LokiPbParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
269 )),
270 _ => UnsupportedContentTypeSnafu { content_type }.fail(),
271 }
272}
273
274struct LokiJsonParser {
275 pub streams: VecDeque<VrlValue>,
276}
277
278impl LokiJsonParser {
279 pub fn from_bytes(bytes: Bytes) -> Result<Self> {
280 let payload: VrlValue = serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
281
282 let VrlValue::Object(mut map) = payload else {
283 return InvalidLokiPayloadSnafu {
284 msg: "payload is not an object",
285 }
286 .fail();
287 };
288
289 let streams = map.remove(STREAMS_KEY).context(InvalidLokiPayloadSnafu {
290 msg: "missing streams",
291 })?;
292
293 let VrlValue::Array(streams) = streams else {
294 return InvalidLokiPayloadSnafu {
295 msg: "streams is not an array",
296 }
297 .fail();
298 };
299
300 Ok(Self {
301 streams: streams.into(),
302 })
303 }
304}
305
306impl Iterator for LokiJsonParser {
307 type Item = JsonStreamItem;
308
309 fn next(&mut self) -> Option<Self::Item> {
310 while let Some(stream) = self.streams.pop_front() {
311 let VrlValue::Object(mut map) = stream else {
313 warn!("stream is not an object, {:?}", stream);
314 continue;
315 };
316 let Some(lines) = map.remove(LINES_KEY) else {
317 warn!("missing lines on stream, {:?}", map);
318 continue;
319 };
320 let VrlValue::Array(lines) = lines else {
321 warn!("lines is not an array, {:?}", lines);
322 continue;
323 };
324
325 let labels = map
327 .remove(LABEL_KEY)
328 .and_then(|m| match m {
329 VrlValue::Object(labels) => Some(labels),
330 _ => None,
331 })
332 .map(|m| {
333 m.into_iter()
334 .filter_map(|(k, v)| match v {
335 VrlValue::Bytes(v) => {
336 Some((k.into(), String::from_utf8_lossy(&v).to_string()))
337 }
338 _ => None,
339 })
340 .collect::<BTreeMap<String, String>>()
341 });
342
343 return Some(JsonStreamItem {
344 lines: lines.into(),
345 labels,
346 });
347 }
348 None
349 }
350}
351
352struct JsonStreamItem {
353 pub lines: VecDeque<VrlValue>,
354 pub labels: Option<BTreeMap<String, String>>,
355}
356
357impl Iterator for JsonStreamItem {
358 type Item = LokiMiddleItem<VrlValue>;
359
360 fn next(&mut self) -> Option<Self::Item> {
361 while let Some(line) = self.lines.pop_front() {
362 let VrlValue::Array(line) = line else {
363 warn!("line is not an array, {:?}", line);
364 continue;
365 };
366 if line.len() < 2 {
367 warn!("line is too short, {:?}", line);
368 continue;
369 }
370 let mut line: VecDeque<VrlValue> = line.into();
371
372 let ts = line.pop_front().and_then(|ts| match ts {
374 VrlValue::Bytes(ts) => String::from_utf8_lossy(&ts).parse::<i64>().ok(),
375 _ => {
376 warn!("missing or invalid timestamp, {:?}", ts);
377 None
378 }
379 });
380 let Some(ts) = ts else {
381 continue;
382 };
383
384 let line_text = line.pop_front().and_then(|l| match l {
385 VrlValue::Bytes(l) => Some(String::from_utf8_lossy(&l).to_string()),
386 _ => {
387 warn!("missing or invalid line, {:?}", l);
388 None
389 }
390 });
391 let Some(line_text) = line_text else {
392 continue;
393 };
394
395 let structured_metadata = line.pop_front();
396
397 return Some(LokiMiddleItem {
398 ts,
399 line: line_text,
400 structured_metadata,
401 labels: self.labels.clone(),
402 });
403 }
404 None
405 }
406}
407
408impl From<LokiMiddleItem<VrlValue>> for LokiRawItem {
409 fn from(val: LokiMiddleItem<VrlValue>) -> Self {
410 let LokiMiddleItem {
411 ts,
412 line,
413 structured_metadata,
414 labels,
415 } = val;
416
417 let structured_metadata = structured_metadata
418 .and_then(|m| match m {
419 VrlValue::Object(m) => Some(m),
420 _ => None,
421 })
422 .map(|m| {
423 m.into_iter()
424 .filter_map(|(k, v)| match v {
425 VrlValue::Bytes(bytes) => Some((
426 k.into(),
427 Value::String(String::from_utf8_lossy(&bytes).to_string().into()),
428 )),
429 _ => None,
430 })
431 .collect::<BTreeMap<String, Value>>()
432 })
433 .unwrap_or_default();
434 let structured_metadata = Value::Object(structured_metadata).to_vec();
435
436 LokiRawItem {
437 ts,
438 line,
439 structured_metadata,
440 labels,
441 }
442 }
443}
444
445impl From<LokiMiddleItem<VrlValue>> for LokiPipeline {
446 fn from(value: LokiMiddleItem<VrlValue>) -> Self {
447 let LokiMiddleItem {
448 ts,
449 line,
450 structured_metadata,
451 labels,
452 } = value;
453
454 let mut map = BTreeMap::new();
455 map.insert(
456 KeyString::from(GREPTIME_TIMESTAMP),
457 VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
458 );
459 map.insert(
460 KeyString::from(LOKI_LINE_COLUMN_NAME),
461 VrlValue::Bytes(line.into()),
462 );
463
464 if let Some(VrlValue::Object(m)) = structured_metadata {
465 for (k, v) in m {
466 map.insert(
467 KeyString::from(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, k)),
468 v,
469 );
470 }
471 }
472 if let Some(v) = labels {
473 v.into_iter().for_each(|(k, v)| {
474 map.insert(
475 KeyString::from(format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k)),
476 VrlValue::Bytes(v.into()),
477 );
478 });
479 }
480
481 LokiPipeline {
482 map: VrlValue::Object(map),
483 }
484 }
485}
486
487pub struct LokiPbParser {
488 pub streams: VecDeque<loki_proto::logproto::StreamAdapter>,
489}
490
491impl LokiPbParser {
492 pub fn from_bytes(bytes: Bytes) -> Result<Self> {
493 let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
494 let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
495 .context(DecodeOtlpRequestSnafu)?;
496
497 Ok(Self {
498 streams: req.streams.into(),
499 })
500 }
501}
502
503impl Iterator for LokiPbParser {
504 type Item = PbStreamItem;
505
506 fn next(&mut self) -> Option<Self::Item> {
507 let stream = self.streams.pop_front()?;
508
509 let labels = parse_loki_labels(&stream.labels)
510 .inspect_err(|e| {
511 error!(e; "failed to parse loki labels, {:?}", stream.labels);
512 })
513 .ok();
514
515 Some(PbStreamItem {
516 entries: stream.entries.into(),
517 labels,
518 })
519 }
520}
521
522pub struct PbStreamItem {
523 pub entries: VecDeque<loki_proto::logproto::EntryAdapter>,
524 pub labels: Option<BTreeMap<String, String>>,
525}
526
527impl Iterator for PbStreamItem {
528 type Item = LokiMiddleItem<Vec<LabelPairAdapter>>;
529
530 fn next(&mut self) -> Option<Self::Item> {
531 while let Some(entry) = self.entries.pop_front() {
532 let ts = if let Some(ts) = entry.timestamp {
533 ts
534 } else {
535 warn!("missing timestamp, {:?}", entry);
536 continue;
537 };
538 let line = entry.line;
539
540 let structured_metadata = entry.structured_metadata;
541
542 return Some(LokiMiddleItem {
543 ts: prost_ts_to_nano(&ts),
544 line,
545 structured_metadata: Some(structured_metadata),
546 labels: self.labels.clone(),
547 });
548 }
549 None
550 }
551}
552
553impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiRawItem {
554 fn from(val: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
555 let LokiMiddleItem {
556 ts,
557 line,
558 structured_metadata,
559 labels,
560 } = val;
561
562 let structured_metadata = structured_metadata
563 .unwrap_or_default()
564 .into_iter()
565 .map(|d| (d.name, Value::String(d.value.into())))
566 .collect::<BTreeMap<String, Value>>();
567 let structured_metadata = Value::Object(structured_metadata).to_vec();
568
569 LokiRawItem {
570 ts,
571 line,
572 structured_metadata,
573 labels,
574 }
575 }
576}
577
578impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiPipeline {
579 fn from(value: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
580 let LokiMiddleItem {
581 ts,
582 line,
583 structured_metadata,
584 labels,
585 } = value;
586
587 let mut map = BTreeMap::new();
588 map.insert(
589 KeyString::from(GREPTIME_TIMESTAMP),
590 VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
591 );
592 map.insert(
593 KeyString::from(LOKI_LINE_COLUMN_NAME),
594 VrlValue::Bytes(line.into()),
595 );
596
597 structured_metadata
598 .unwrap_or_default()
599 .into_iter()
600 .for_each(|d| {
601 map.insert(
602 KeyString::from(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, d.name)),
603 VrlValue::Bytes(d.value.into()),
604 );
605 });
606
607 if let Some(v) = labels {
608 v.into_iter().for_each(|(k, v)| {
609 map.insert(
610 KeyString::from(format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k)),
611 VrlValue::Bytes(v.into()),
612 );
613 });
614 }
615
616 LokiPipeline {
617 map: VrlValue::Object(map),
618 }
619 }
620}
621
622pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
628 let mut labels = labels.trim();
629 ensure!(
630 labels.len() >= 2,
631 InvalidLokiLabelsSnafu {
632 msg: "labels string too short"
633 }
634 );
635 ensure!(
636 labels.starts_with("{"),
637 InvalidLokiLabelsSnafu {
638 msg: "missing `{` at the beginning"
639 }
640 );
641 ensure!(
642 labels.ends_with("}"),
643 InvalidLokiLabelsSnafu {
644 msg: "missing `}` at the end"
645 }
646 );
647
648 let mut result = BTreeMap::new();
649 labels = &labels[1..labels.len() - 1];
650
651 while !labels.is_empty() {
652 let first_index = labels.find("=").with_context(|| InvalidLokiLabelsSnafu {
654 msg: format!("missing `=` near: {}", labels),
655 })?;
656 let key = &labels[..first_index];
657 labels = &labels[first_index + 1..];
658
659 let qs = quoted_string::parse::<TestSpec>(labels)
661 .map_err(|e| {
662 InvalidLokiLabelsSnafu {
663 msg: format!(
664 "failed to parse quoted string near: {}, reason: {}",
665 labels, e.1
666 ),
667 }
668 .build()
669 })?
670 .quoted_string;
671
672 labels = &labels[qs.len()..];
673
674 let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
675 InvalidLokiLabelsSnafu {
676 msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
677 }
678 .build()
679 })?;
680
681 result.insert(key.to_string(), value.to_string());
683
684 if labels.is_empty() {
685 break;
686 }
687 ensure!(
688 labels.starts_with(","),
689 InvalidLokiLabelsSnafu { msg: "missing `,`" }
690 );
691 labels = labels[1..].trim_start();
692 }
693
694 Ok(result)
695}
696
697#[inline]
698fn prost_ts_to_nano(ts: &LokiTimestamp) -> i64 {
699 ts.seconds * 1_000_000_000 + ts.nanos as i64
700}
701
702fn init_row(
703 schema_len: usize,
704 ts: i64,
705 line: String,
706 structured_metadata: Vec<u8>,
707) -> Vec<GreptimeValue> {
708 let mut row = Vec::with_capacity(schema_len);
710 row.push(GreptimeValue {
712 value_data: Some(ValueData::TimestampNanosecondValue(ts)),
713 });
714 row.push(GreptimeValue {
715 value_data: Some(ValueData::StringValue(line)),
716 });
717 row.push(GreptimeValue {
718 value_data: Some(ValueData::BinaryValue(structured_metadata)),
719 });
720 for _ in 0..(schema_len - 3) {
721 row.push(GreptimeValue { value_data: None });
722 }
723 row
724}
725
726fn process_labels(
727 schema_info: &mut SchemaInfo,
728 row: &mut Vec<GreptimeValue>,
729 labels: Option<BTreeMap<String, String>>,
730) {
731 let Some(labels) = labels else {
732 return;
733 };
734
735 let column_indexer = &mut schema_info.index;
736 let schemas = &mut schema_info.schema;
737
738 for (k, v) in labels {
740 if let Some(index) = column_indexer.get(&k) {
741 row[*index] = GreptimeValue {
744 value_data: Some(ValueData::StringValue(v)),
745 };
746 } else {
747 schemas.push(ColumnSchema {
750 column_name: k.clone(),
751 datatype: ColumnDataType::String.into(),
752 semantic_type: SemanticType::Tag.into(),
753 datatype_extension: None,
754 options: None,
755 });
756 column_indexer.insert(k, schemas.len() - 1);
757
758 row.push(GreptimeValue {
759 value_data: Some(ValueData::StringValue(v)),
760 });
761 }
762 }
763}
764
765#[cfg(test)]
766mod tests {
767 use std::collections::BTreeMap;
768
769 use loki_proto::prost_types::Timestamp;
770
771 use crate::error::Error::InvalidLokiLabels;
772 use crate::http::loki::{parse_loki_labels, prost_ts_to_nano};
773
774 #[test]
775 fn test_ts_to_nano() {
776 let ts = Timestamp {
780 seconds: 1731748568,
781 nanos: 804293888,
782 };
783 assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
784 }
785
786 #[test]
787 fn test_parse_loki_labels() {
788 let mut expected = BTreeMap::new();
789 expected.insert("job".to_string(), "foobar".to_string());
790 expected.insert("cluster".to_string(), "foo-central1".to_string());
791 expected.insert("namespace".to_string(), "bar".to_string());
792 expected.insert("container_name".to_string(), "buzz".to_string());
793
794 let valid_labels =
796 r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
797 let re = parse_loki_labels(valid_labels);
798 assert!(re.is_ok());
799 assert_eq!(re.unwrap(), expected);
800
801 let too_short = r#"}"#;
803 let re = parse_loki_labels(too_short);
804 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
805
806 let missing_start = r#"job="foobar"}"#;
808 let re = parse_loki_labels(missing_start);
809 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
810
811 let missing_end = r#"{job="foobar""#;
813 let re = parse_loki_labels(missing_end);
814 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
815
816 let missing_equal = r#"{job"foobar"}"#;
818 let re = parse_loki_labels(missing_equal);
819 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
820
821 let missing_quote = r#"{job=foobar}"#;
823 let re = parse_loki_labels(missing_quote);
824 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
825
826 let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
828 let re = parse_loki_labels(missing_comma);
829 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
830 }
831}