1use std::collections::{BTreeMap, VecDeque};
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::value::ValueData;
20use api::v1::{
21 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22 RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
23};
24use axum::Extension;
25use axum::extract::State;
26use axum_extra::TypedHeader;
27use bytes::Bytes;
28use chrono::DateTime;
29use common_query::prelude::greptime_timestamp;
30use common_telemetry::{error, warn};
31use headers::ContentType;
32use jsonb::Value;
33use lazy_static::lazy_static;
34use loki_proto::logproto::LabelPairAdapter;
35use loki_proto::prost_types::Timestamp as LokiTimestamp;
36use pipeline::util::to_pipeline_version;
37use pipeline::{
38 ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, SchemaInfo,
39};
40use prost::Message;
41use quoted_string::test_utils::TestSpec;
42use session::context::{Channel, QueryContext, QueryContextRef};
43use snafu::{OptionExt, ResultExt, ensure};
44use snap::raw::Decoder;
45use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_LOKI};
46use vrl::value::{KeyString, Value as VrlValue};
47
48use crate::error::{
49 DecodeLokiRequestSnafu, DecompressSnappyLokiRequestSnafu, InvalidLokiLabelsSnafu,
50 InvalidLokiPayloadSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
51};
52use crate::http::HttpResponse;
53use crate::http::event::{
54 JSON_CONTENT_TYPE, LogState, PB_CONTENT_TYPE, PipelineIngestRequest, execute_log_context_req,
55};
56use crate::http::extractor::{LogTableName, PipelineInfo};
57use crate::metrics::{METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED};
58use crate::pipeline::run_pipeline;
59use crate::query_handler::PipelineHandlerRef;
60
61const LOKI_TABLE_NAME: &str = "loki_logs";
62const LOKI_LINE_COLUMN: &str = "line";
63const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
64
65const LOKI_LINE_COLUMN_NAME: &str = "loki_line";
66
67const LOKI_PIPELINE_METADATA_PREFIX: &str = "loki_metadata_";
68const LOKI_PIPELINE_LABEL_PREFIX: &str = "loki_label_";
69
70const STREAMS_KEY: &str = "streams";
71const LABEL_KEY: &str = "stream";
72const LINES_KEY: &str = "values";
73
74lazy_static! {
75 static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
76 ColumnSchema {
77 column_name: greptime_timestamp().to_string(),
78 datatype: ColumnDataType::TimestampNanosecond.into(),
79 semantic_type: SemanticType::Timestamp.into(),
80 datatype_extension: None,
81 options: None,
82 },
83 ColumnSchema {
84 column_name: LOKI_LINE_COLUMN.to_string(),
85 datatype: ColumnDataType::String.into(),
86 semantic_type: SemanticType::Field.into(),
87 datatype_extension: None,
88 options: None,
89 },
90 ColumnSchema {
91 column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(),
92 datatype: ColumnDataType::Binary.into(),
93 semantic_type: SemanticType::Field.into(),
94 datatype_extension: Some(ColumnDataTypeExtension {
95 type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType(
96 JsonTypeExtension::JsonBinary.into()
97 ))
98 }),
99 options: None,
100 }
101 ];
102}
103
104#[axum_macros::debug_handler]
105pub async fn loki_ingest(
106 State(log_state): State<LogState>,
107 Extension(mut ctx): Extension<QueryContext>,
108 TypedHeader(content_type): TypedHeader<ContentType>,
109 LogTableName(table_name): LogTableName,
110 pipeline_info: PipelineInfo,
111 bytes: Bytes,
112) -> Result<HttpResponse> {
113 ctx.set_channel(Channel::Loki);
114 ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG);
115 ctx.set_extension(SEMANTIC_SOURCE, SOURCE_LOKI);
116 let ctx = Arc::new(ctx);
117 let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
118 let handler = log_state.log_handler;
119 let exec_timer = Instant::now();
122
123 let ctx_req = build_loki_context_req(
124 &handler,
125 content_type,
126 table_name,
127 pipeline_info,
128 bytes,
129 &ctx,
130 )
131 .await?;
132
133 execute_log_context_req(
134 handler,
135 ctx_req,
136 ctx,
137 exec_timer,
138 &METRIC_LOKI_LOGS_INGESTION_COUNTER,
139 &METRIC_LOKI_LOGS_INGESTION_ELAPSED,
140 )
141 .await
142}
143
144pub struct LokiMiddleItem<T> {
148 pub ts: i64,
149 pub line: String,
150 pub structured_metadata: Option<T>,
151 pub labels: Option<BTreeMap<String, String>>,
152}
153
154pub struct LokiRawItem {
158 pub ts: i64,
159 pub line: String,
160 pub structured_metadata: Vec<u8>,
161 pub labels: Option<BTreeMap<String, String>>,
162}
163
164pub struct LokiPipeline {
166 pub map: VrlValue,
167}
168
169struct LokiPipelineContextReq {
170 content_type: ContentType,
171 table_name: String,
172 pipeline_name: String,
173 pipeline_version: Option<String>,
174 pipeline_params: GreptimePipelineParams,
175 bytes: Bytes,
176}
177
178async fn build_loki_context_req(
179 handler: &PipelineHandlerRef,
180 content_type: ContentType,
181 table_name: String,
182 pipeline_info: PipelineInfo,
183 bytes: Bytes,
184 ctx: &QueryContextRef,
185) -> Result<ContextReq> {
186 match pipeline_info.pipeline_name {
189 Some(pipeline_name) => {
190 let pipeline_req = LokiPipelineContextReq {
191 content_type,
192 table_name,
193 pipeline_name,
194 pipeline_version: pipeline_info.pipeline_version,
195 pipeline_params: pipeline_info.pipeline_params,
196 bytes,
197 };
198 build_loki_pipeline_context_req(handler, pipeline_req, ctx).await
199 }
200 None => {
201 let req = build_loki_raw_insert_request(content_type, table_name, bytes)?;
202 Ok(ContextReq::default_opt_with_reqs(vec![req]))
203 }
204 }
205}
206
207async fn build_loki_pipeline_context_req(
208 handler: &PipelineHandlerRef,
209 pipeline_req: LokiPipelineContextReq,
210 ctx: &QueryContextRef,
211) -> Result<ContextReq> {
212 let LokiPipelineContextReq {
213 content_type,
214 table_name,
215 pipeline_name,
216 pipeline_version,
217 pipeline_params,
218 bytes,
219 } = pipeline_req;
220
221 let version = to_pipeline_version(pipeline_version.as_deref()).context(PipelineSnafu)?;
222 let def =
223 PipelineDefinition::from_name(&pipeline_name, version, None).context(PipelineSnafu)?;
224 let pipeline_ctx = PipelineContext::new(&def, &pipeline_params, Channel::Loki);
225
226 let values = extract_item::<LokiPipeline>(content_type, bytes)?
227 .map(|item| item.map)
228 .collect::<Vec<_>>();
229
230 let req = PipelineIngestRequest {
231 table: table_name,
232 values,
233 };
234
235 run_pipeline(handler, &pipeline_ctx, req, ctx, true).await
236}
237
238fn build_loki_raw_insert_request(
239 content_type: ContentType,
240 table_name: String,
241 bytes: Bytes,
242) -> Result<RowInsertRequest> {
243 let mut schema_info = SchemaInfo::from_schema_list(LOKI_INIT_SCHEMAS.clone());
244 let mut rows = Vec::with_capacity(256);
245 for loki_row in extract_item::<LokiRawItem>(content_type, bytes)? {
246 let mut row = init_row(
247 schema_info.schema.len(),
248 loki_row.ts,
249 loki_row.line,
250 loki_row.structured_metadata,
251 );
252 process_labels(&mut schema_info, &mut row, loki_row.labels);
253 rows.push(row);
254 }
255
256 let schemas = schema_info.column_schemas()?;
257 for row in rows.iter_mut() {
259 row.resize(schemas.len(), GreptimeValue::default());
260 }
261 let rows = Rows {
262 rows: rows.into_iter().map(|values| Row { values }).collect(),
263 schema: schemas,
264 };
265
266 Ok(RowInsertRequest {
267 table_name,
268 rows: Some(rows),
269 })
270}
271
272fn extract_item<T>(content_type: ContentType, bytes: Bytes) -> Result<Box<dyn Iterator<Item = T>>>
279where
280 LokiMiddleItem<VrlValue>: Into<T>,
281 LokiMiddleItem<Vec<LabelPairAdapter>>: Into<T>,
282{
283 match content_type {
284 x if x == *JSON_CONTENT_TYPE => Ok(Box::new(
285 LokiJsonParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
286 )),
287 x if x == *PB_CONTENT_TYPE => Ok(Box::new(
288 LokiPbParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
289 )),
290 _ => UnsupportedContentTypeSnafu { content_type }.fail(),
291 }
292}
293
294struct LokiJsonParser {
295 pub streams: VecDeque<VrlValue>,
296}
297
298impl LokiJsonParser {
299 pub fn from_bytes(bytes: Bytes) -> Result<Self> {
300 let payload: VrlValue = serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
301
302 let VrlValue::Object(mut map) = payload else {
303 return InvalidLokiPayloadSnafu {
304 msg: "payload is not an object",
305 }
306 .fail();
307 };
308
309 let streams = map.remove(STREAMS_KEY).context(InvalidLokiPayloadSnafu {
310 msg: "missing streams",
311 })?;
312
313 let VrlValue::Array(streams) = streams else {
314 return InvalidLokiPayloadSnafu {
315 msg: "streams is not an array",
316 }
317 .fail();
318 };
319
320 Ok(Self {
321 streams: streams.into(),
322 })
323 }
324}
325
326impl Iterator for LokiJsonParser {
327 type Item = JsonStreamItem;
328
329 fn next(&mut self) -> Option<Self::Item> {
330 while let Some(stream) = self.streams.pop_front() {
331 let VrlValue::Object(mut map) = stream else {
333 warn!("stream is not an object, {:?}", stream);
334 continue;
335 };
336 let Some(lines) = map.remove(LINES_KEY) else {
337 warn!("missing lines on stream, {:?}", map);
338 continue;
339 };
340 let VrlValue::Array(lines) = lines else {
341 warn!("lines is not an array, {:?}", lines);
342 continue;
343 };
344
345 let labels = map
347 .remove(LABEL_KEY)
348 .and_then(|m| match m {
349 VrlValue::Object(labels) => Some(labels),
350 _ => None,
351 })
352 .map(|m| {
353 m.into_iter()
354 .filter_map(|(k, v)| match v {
355 VrlValue::Bytes(v) => {
356 Some((k.into(), String::from_utf8_lossy(&v).to_string()))
357 }
358 _ => None,
359 })
360 .collect::<BTreeMap<String, String>>()
361 });
362
363 return Some(JsonStreamItem {
364 lines: lines.into(),
365 labels,
366 });
367 }
368 None
369 }
370}
371
372struct JsonStreamItem {
373 pub lines: VecDeque<VrlValue>,
374 pub labels: Option<BTreeMap<String, String>>,
375}
376
377impl Iterator for JsonStreamItem {
378 type Item = LokiMiddleItem<VrlValue>;
379
380 fn next(&mut self) -> Option<Self::Item> {
381 while let Some(line) = self.lines.pop_front() {
382 let VrlValue::Array(line) = line else {
383 warn!("line is not an array, {:?}", line);
384 continue;
385 };
386 if line.len() < 2 {
387 warn!("line is too short, {:?}", line);
388 continue;
389 }
390 let mut line: VecDeque<VrlValue> = line.into();
391
392 let ts = line.pop_front().and_then(|ts| match ts {
394 VrlValue::Bytes(ts) => String::from_utf8_lossy(&ts).parse::<i64>().ok(),
395 _ => {
396 warn!("missing or invalid timestamp, {:?}", ts);
397 None
398 }
399 });
400 let Some(ts) = ts else {
401 continue;
402 };
403
404 let line_text = line.pop_front().and_then(|l| match l {
405 VrlValue::Bytes(l) => Some(String::from_utf8_lossy(&l).to_string()),
406 _ => {
407 warn!("missing or invalid line, {:?}", l);
408 None
409 }
410 });
411 let Some(line_text) = line_text else {
412 continue;
413 };
414
415 let structured_metadata = line.pop_front();
416
417 return Some(LokiMiddleItem {
418 ts,
419 line: line_text,
420 structured_metadata,
421 labels: self.labels.clone(),
422 });
423 }
424 None
425 }
426}
427
428type LokiPipelineMap = BTreeMap<KeyString, VrlValue>;
429
430fn vrl_metadata_to_jsonb(structured_metadata: Option<VrlValue>) -> Vec<u8> {
431 let structured_metadata = structured_metadata
433 .and_then(|metadata| match metadata {
434 VrlValue::Object(metadata) => Some(metadata),
435 _ => None,
436 })
437 .map(|metadata| {
438 metadata
439 .into_iter()
440 .filter_map(|(key, value)| match value {
441 VrlValue::Bytes(bytes) => Some((
442 key.into(),
443 Value::String(String::from_utf8_lossy(&bytes).to_string().into()),
444 )),
445 _ => None,
446 })
447 .collect::<BTreeMap<String, Value>>()
448 })
449 .unwrap_or_default();
450
451 Value::Object(structured_metadata).to_vec()
452}
453
454fn label_pair_metadata_to_jsonb(structured_metadata: Option<Vec<LabelPairAdapter>>) -> Vec<u8> {
455 let structured_metadata = structured_metadata
457 .unwrap_or_default()
458 .into_iter()
459 .map(|metadata| (metadata.name, Value::String(metadata.value.into())))
460 .collect::<BTreeMap<String, Value>>();
461
462 Value::Object(structured_metadata).to_vec()
463}
464
465fn new_loki_pipeline_map(ts: i64, line: String) -> LokiPipelineMap {
466 let mut map = BTreeMap::new();
467 map.insert(
468 KeyString::from(greptime_timestamp()),
469 VrlValue::Timestamp(DateTime::from_timestamp_nanos(ts)),
470 );
471 map.insert(
472 KeyString::from(LOKI_LINE_COLUMN_NAME),
473 VrlValue::Bytes(line.into()),
474 );
475 map
476}
477
478fn append_vrl_pipeline_metadata(map: &mut LokiPipelineMap, structured_metadata: Option<VrlValue>) {
479 if let Some(VrlValue::Object(metadata)) = structured_metadata {
480 for (key, value) in metadata {
481 map.insert(
482 KeyString::from(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, key)),
483 value,
484 );
485 }
486 }
487}
488
489fn append_label_pair_pipeline_metadata(
490 map: &mut LokiPipelineMap,
491 structured_metadata: Option<Vec<LabelPairAdapter>>,
492) {
493 for metadata in structured_metadata.unwrap_or_default() {
494 map.insert(
495 KeyString::from(format!(
496 "{}{}",
497 LOKI_PIPELINE_METADATA_PREFIX, metadata.name
498 )),
499 VrlValue::Bytes(metadata.value.into()),
500 );
501 }
502}
503
504fn append_pipeline_labels(map: &mut LokiPipelineMap, labels: Option<BTreeMap<String, String>>) {
505 if let Some(labels) = labels {
506 for (key, value) in labels {
507 map.insert(
508 KeyString::from(format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, key)),
509 VrlValue::Bytes(value.into()),
510 );
511 }
512 }
513}
514
515impl From<LokiMiddleItem<VrlValue>> for LokiRawItem {
516 fn from(val: LokiMiddleItem<VrlValue>) -> Self {
517 let LokiMiddleItem {
518 ts,
519 line,
520 structured_metadata,
521 labels,
522 } = val;
523
524 LokiRawItem {
525 ts,
526 line,
527 structured_metadata: vrl_metadata_to_jsonb(structured_metadata),
528 labels,
529 }
530 }
531}
532
533impl From<LokiMiddleItem<VrlValue>> for LokiPipeline {
534 fn from(value: LokiMiddleItem<VrlValue>) -> Self {
535 let LokiMiddleItem {
536 ts,
537 line,
538 structured_metadata,
539 labels,
540 } = value;
541
542 let mut map = new_loki_pipeline_map(ts, line);
543 append_vrl_pipeline_metadata(&mut map, structured_metadata);
544 append_pipeline_labels(&mut map, labels);
545
546 LokiPipeline {
547 map: VrlValue::Object(map),
548 }
549 }
550}
551
552pub struct LokiPbParser {
553 pub streams: VecDeque<loki_proto::logproto::StreamAdapter>,
554}
555
556impl LokiPbParser {
557 pub fn from_bytes(bytes: Bytes) -> Result<Self> {
558 let decompressed = snappy_decompress_loki_request(&bytes)?;
559 let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
560 .context(DecodeLokiRequestSnafu)?;
561
562 Ok(Self {
563 streams: req.streams.into(),
564 })
565 }
566}
567
568fn snappy_decompress_loki_request(buf: &[u8]) -> Result<Vec<u8>> {
569 let mut decoder = Decoder::new();
572 decoder
573 .decompress_vec(buf)
574 .context(DecompressSnappyLokiRequestSnafu)
575}
576
577impl Iterator for LokiPbParser {
578 type Item = PbStreamItem;
579
580 fn next(&mut self) -> Option<Self::Item> {
581 let stream = self.streams.pop_front()?;
582
583 let labels = parse_loki_labels(&stream.labels)
584 .inspect_err(|e| {
585 error!(e; "failed to parse loki labels, {:?}", stream.labels);
586 })
587 .ok();
588
589 Some(PbStreamItem {
590 entries: stream.entries.into(),
591 labels,
592 })
593 }
594}
595
596pub struct PbStreamItem {
597 pub entries: VecDeque<loki_proto::logproto::EntryAdapter>,
598 pub labels: Option<BTreeMap<String, String>>,
599}
600
601impl Iterator for PbStreamItem {
602 type Item = LokiMiddleItem<Vec<LabelPairAdapter>>;
603
604 fn next(&mut self) -> Option<Self::Item> {
605 while let Some(entry) = self.entries.pop_front() {
606 let ts = if let Some(ts) = entry.timestamp {
607 ts
608 } else {
609 warn!("missing timestamp, {:?}", entry);
610 continue;
611 };
612 let line = entry.line;
613
614 let structured_metadata = entry.structured_metadata;
615
616 return Some(LokiMiddleItem {
617 ts: prost_ts_to_nano(&ts),
618 line,
619 structured_metadata: Some(structured_metadata),
620 labels: self.labels.clone(),
621 });
622 }
623 None
624 }
625}
626
627impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiRawItem {
628 fn from(val: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
629 let LokiMiddleItem {
630 ts,
631 line,
632 structured_metadata,
633 labels,
634 } = val;
635
636 LokiRawItem {
637 ts,
638 line,
639 structured_metadata: label_pair_metadata_to_jsonb(structured_metadata),
640 labels,
641 }
642 }
643}
644
645impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiPipeline {
646 fn from(value: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
647 let LokiMiddleItem {
648 ts,
649 line,
650 structured_metadata,
651 labels,
652 } = value;
653
654 let mut map = new_loki_pipeline_map(ts, line);
655 append_label_pair_pipeline_metadata(&mut map, structured_metadata);
656 append_pipeline_labels(&mut map, labels);
657
658 LokiPipeline {
659 map: VrlValue::Object(map),
660 }
661 }
662}
663
664pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
670 let mut labels = labels.trim();
671 ensure!(
672 labels.len() >= 2,
673 InvalidLokiLabelsSnafu {
674 msg: "labels string too short"
675 }
676 );
677 ensure!(
678 labels.starts_with("{"),
679 InvalidLokiLabelsSnafu {
680 msg: "missing `{` at the beginning"
681 }
682 );
683 ensure!(
684 labels.ends_with("}"),
685 InvalidLokiLabelsSnafu {
686 msg: "missing `}` at the end"
687 }
688 );
689
690 let mut result = BTreeMap::new();
691 labels = &labels[1..labels.len() - 1];
692
693 while !labels.is_empty() {
694 let first_index = labels.find("=").with_context(|| InvalidLokiLabelsSnafu {
696 msg: format!("missing `=` near: {}", labels),
697 })?;
698 let key = &labels[..first_index];
699 labels = &labels[first_index + 1..];
700
701 let qs = quoted_string::parse::<TestSpec>(labels)
703 .map_err(|e| {
704 InvalidLokiLabelsSnafu {
705 msg: format!(
706 "failed to parse quoted string near: {}, reason: {}",
707 labels, e.1
708 ),
709 }
710 .build()
711 })?
712 .quoted_string;
713
714 labels = &labels[qs.len()..];
715
716 let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
717 InvalidLokiLabelsSnafu {
718 msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
719 }
720 .build()
721 })?;
722
723 result.insert(key.to_string(), value.to_string());
725
726 if labels.is_empty() {
727 break;
728 }
729 ensure!(
730 labels.starts_with(","),
731 InvalidLokiLabelsSnafu { msg: "missing `,`" }
732 );
733 labels = labels[1..].trim_start();
734 }
735
736 Ok(result)
737}
738
739#[inline]
740fn prost_ts_to_nano(ts: &LokiTimestamp) -> i64 {
741 ts.seconds * 1_000_000_000 + ts.nanos as i64
742}
743
744fn init_row(
745 schema_len: usize,
746 ts: i64,
747 line: String,
748 structured_metadata: Vec<u8>,
749) -> Vec<GreptimeValue> {
750 let mut row = Vec::with_capacity(schema_len);
752 row.push(GreptimeValue {
754 value_data: Some(ValueData::TimestampNanosecondValue(ts)),
755 });
756 row.push(GreptimeValue {
757 value_data: Some(ValueData::StringValue(line)),
758 });
759 row.push(GreptimeValue {
760 value_data: Some(ValueData::BinaryValue(structured_metadata)),
761 });
762 for _ in 0..(schema_len - 3) {
763 row.push(GreptimeValue { value_data: None });
764 }
765 row
766}
767
768fn process_labels(
769 schema_info: &mut SchemaInfo,
770 row: &mut Vec<GreptimeValue>,
771 labels: Option<BTreeMap<String, String>>,
772) {
773 let Some(labels) = labels else {
774 return;
775 };
776
777 let column_indexer = &mut schema_info.index;
778 let schemas = &mut schema_info.schema;
779
780 for (k, v) in labels {
782 if let Some(index) = column_indexer.get(&k) {
783 row[*index] = GreptimeValue {
786 value_data: Some(ValueData::StringValue(v)),
787 };
788 } else {
789 schemas.push(
792 ColumnSchema {
793 column_name: k.clone(),
794 datatype: ColumnDataType::String.into(),
795 semantic_type: SemanticType::Tag.into(),
796 datatype_extension: None,
797 options: None,
798 }
799 .into(),
800 );
801 column_indexer.insert(k, schemas.len() - 1);
802
803 row.push(GreptimeValue {
804 value_data: Some(ValueData::StringValue(v)),
805 });
806 }
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use std::collections::BTreeMap;
813
814 use bytes::Bytes;
815 use loki_proto::logproto::{EntryAdapter, PushRequest, StreamAdapter};
816 use loki_proto::prost_types::Timestamp;
817 use prost::Message;
818
819 use super::*;
820 use crate::error::Error::{DecompressSnappyLokiRequest, InvalidLokiLabels};
821 use crate::prom_store::snappy_compress;
822
823 const JSON_PAYLOAD: &[u8] = br#"{
824 "streams": [
825 {
826 "stream": {
827 "job": "api",
828 "namespace": "prod"
829 },
830 "values": [
831 ["1731748568804293888", "line one", {"trace_id": "abc"}]
832 ]
833 },
834 {
835 "stream": {
836 "job": "worker",
837 "pod": "worker-0"
838 },
839 "values": [
840 ["1731748568804293889", "line two"]
841 ]
842 }
843 ]
844 }"#;
845
846 fn row_string_value(row: &Row, index: usize) -> Option<&str> {
847 match row.values[index].value_data.as_ref() {
848 Some(ValueData::StringValue(value)) => Some(value.as_str()),
849 _ => None,
850 }
851 }
852
853 fn pipeline_bytes_value(map: &BTreeMap<KeyString, VrlValue>, key: &str) -> Option<String> {
854 match map.get(&KeyString::from(key))? {
855 VrlValue::Bytes(value) => Some(String::from_utf8_lossy(value.as_ref()).to_string()),
856 _ => None,
857 }
858 }
859
860 #[test]
861 fn test_ts_to_nano() {
862 let ts = Timestamp {
866 seconds: 1731748568,
867 nanos: 804293888,
868 };
869 assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
870 }
871
872 #[test]
873 fn test_json_direct_ingest_builds_schema_and_pads_rows() {
874 let request = build_loki_raw_insert_request(
875 JSON_CONTENT_TYPE.clone(),
876 "custom_loki".to_string(),
877 Bytes::from_static(JSON_PAYLOAD),
878 )
879 .unwrap();
880
881 assert_eq!(request.table_name, "custom_loki");
882 let rows = request.rows.unwrap();
883 let column_names = rows
884 .schema
885 .iter()
886 .map(|schema| schema.column_name.as_str())
887 .collect::<Vec<_>>();
888 assert_eq!(
889 column_names,
890 vec![
891 greptime_timestamp(),
892 LOKI_LINE_COLUMN,
893 LOKI_STRUCTURED_METADATA_COLUMN,
894 "job",
895 "namespace",
896 "pod",
897 ]
898 );
899 assert_eq!(rows.schema[3].semantic_type, SemanticType::Tag as i32);
900 assert_eq!(rows.schema[4].semantic_type, SemanticType::Tag as i32);
901 assert_eq!(rows.schema[5].semantic_type, SemanticType::Tag as i32);
902 assert_eq!(rows.rows.len(), 2);
903
904 let first = &rows.rows[0];
905 assert_eq!(first.values.len(), rows.schema.len());
906 assert_eq!(row_string_value(first, 1), Some("line one"));
907 assert_eq!(row_string_value(first, 3), Some("api"));
908 assert_eq!(row_string_value(first, 4), Some("prod"));
909 assert!(first.values[5].value_data.is_none());
910
911 let second = &rows.rows[1];
912 assert_eq!(second.values.len(), rows.schema.len());
913 assert_eq!(row_string_value(second, 1), Some("line two"));
914 assert_eq!(row_string_value(second, 3), Some("worker"));
915 assert!(second.values[4].value_data.is_none());
916 assert_eq!(row_string_value(second, 5), Some("worker-0"));
917 }
918
919 #[test]
920 fn test_json_pipeline_conversion_names_loki_fields() {
921 let items = extract_item::<LokiPipeline>(
922 JSON_CONTENT_TYPE.clone(),
923 Bytes::from_static(JSON_PAYLOAD),
924 )
925 .unwrap()
926 .collect::<Vec<_>>();
927
928 assert_eq!(items.len(), 2);
929 let VrlValue::Object(map) = &items[0].map else {
930 panic!("expected pipeline object");
931 };
932 assert!(matches!(
933 map.get(&KeyString::from(greptime_timestamp())),
934 Some(VrlValue::Timestamp(_))
935 ));
936 assert_eq!(
937 pipeline_bytes_value(map, LOKI_LINE_COLUMN_NAME),
938 Some("line one".to_string())
939 );
940 assert_eq!(
941 pipeline_bytes_value(map, "loki_label_job"),
942 Some("api".to_string())
943 );
944 assert_eq!(
945 pipeline_bytes_value(map, "loki_label_namespace"),
946 Some("prod".to_string())
947 );
948 assert_eq!(
949 pipeline_bytes_value(map, "loki_metadata_trace_id"),
950 Some("abc".to_string())
951 );
952 }
953
954 #[test]
955 fn test_protobuf_parser_decodes_snappy_push_request() {
956 let request = PushRequest {
957 streams: vec![StreamAdapter {
958 labels: r#"{job="api"}"#.to_string(),
959 entries: vec![EntryAdapter {
960 timestamp: Some(Timestamp {
961 seconds: 1731748568,
962 nanos: 804293888,
963 }),
964 line: "line one".to_string(),
965 structured_metadata: vec![LabelPairAdapter {
966 name: "trace_id".to_string(),
967 value: "abc".to_string(),
968 }],
969 ..Default::default()
970 }],
971 ..Default::default()
972 }],
973 };
974 let bytes = snappy_compress(&request.encode_to_vec()).unwrap();
975
976 let items = extract_item::<LokiRawItem>(PB_CONTENT_TYPE.clone(), Bytes::from(bytes))
977 .unwrap()
978 .collect::<Vec<_>>();
979
980 assert_eq!(items.len(), 1);
981 assert_eq!(items[0].ts, 1731748568804293888);
982 assert_eq!(items[0].line, "line one");
983 assert_eq!(
984 items[0].labels.as_ref().unwrap().get("job"),
985 Some(&"api".to_string())
986 );
987 assert!(!items[0].structured_metadata.is_empty());
988 }
989
990 #[test]
991 fn test_protobuf_parser_rejects_invalid_snappy_payload() {
992 let err = match LokiPbParser::from_bytes(Bytes::from_static(b"not-snappy")) {
993 Ok(_) => panic!("expected invalid snappy payload to fail"),
994 Err(err) => err,
995 };
996
997 assert!(matches!(err, DecompressSnappyLokiRequest { .. }));
998 }
999
1000 #[test]
1001 fn test_parse_loki_labels() {
1002 let mut expected = BTreeMap::new();
1003 expected.insert("job".to_string(), "foobar".to_string());
1004 expected.insert("cluster".to_string(), "foo-central1".to_string());
1005 expected.insert("namespace".to_string(), "bar".to_string());
1006 expected.insert("container_name".to_string(), "buzz".to_string());
1007
1008 let valid_labels =
1010 r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
1011 let re = parse_loki_labels(valid_labels);
1012 assert!(re.is_ok());
1013 assert_eq!(re.unwrap(), expected);
1014
1015 let too_short = r#"}"#;
1017 let re = parse_loki_labels(too_short);
1018 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1019
1020 let missing_start = r#"job="foobar"}"#;
1022 let re = parse_loki_labels(missing_start);
1023 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1024
1025 let missing_end = r#"{job="foobar""#;
1027 let re = parse_loki_labels(missing_end);
1028 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1029
1030 let missing_equal = r#"{job"foobar"}"#;
1032 let re = parse_loki_labels(missing_equal);
1033 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1034
1035 let missing_quote = r#"{job=foobar}"#;
1037 let re = parse_loki_labels(missing_quote);
1038 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1039
1040 let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
1042 let re = parse_loki_labels(missing_comma);
1043 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
1044 }
1045}