1use std::collections::BTreeMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use ahash::{HashMap, HashMapExt};
20use api::v1::value::ValueData;
21use api::v1::{
22 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
23 RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue,
24};
25use axum::extract::State;
26use axum::Extension;
27use axum_extra::TypedHeader;
28use bytes::Bytes;
29use common_query::prelude::GREPTIME_TIMESTAMP;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use headers::ContentType;
33use jsonb::Value;
34use lazy_static::lazy_static;
35use loki_proto::prost_types::Timestamp;
36use prost::Message;
37use quoted_string::test_utils::TestSpec;
38use session::context::{Channel, QueryContext};
39use snafu::{ensure, OptionExt, ResultExt};
40
41use crate::error::{
42 DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu,
43 Result, UnsupportedContentTypeSnafu,
44};
45use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
46use crate::http::extractor::LogTableName;
47use crate::http::result::greptime_result_v1::GreptimedbV1Response;
48use crate::http::HttpResponse;
49use crate::metrics::{
50 METRIC_FAILURE_VALUE, METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED,
51 METRIC_SUCCESS_VALUE,
52};
53use crate::{prom_store, unwrap_or_warn_continue};
54
55const LOKI_TABLE_NAME: &str = "loki_logs";
56const LOKI_LINE_COLUMN: &str = "line";
57const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
58
59const STREAMS_KEY: &str = "streams";
60const LABEL_KEY: &str = "stream";
61const LINES_KEY: &str = "values";
62
63lazy_static! {
64 static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
65 ColumnSchema {
66 column_name: GREPTIME_TIMESTAMP.to_string(),
67 datatype: ColumnDataType::TimestampNanosecond.into(),
68 semantic_type: SemanticType::Timestamp.into(),
69 datatype_extension: None,
70 options: None,
71 },
72 ColumnSchema {
73 column_name: LOKI_LINE_COLUMN.to_string(),
74 datatype: ColumnDataType::String.into(),
75 semantic_type: SemanticType::Field.into(),
76 datatype_extension: None,
77 options: None,
78 },
79 ColumnSchema {
80 column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(),
81 datatype: ColumnDataType::Binary.into(),
82 semantic_type: SemanticType::Field.into(),
83 datatype_extension: Some(ColumnDataTypeExtension {
84 type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType(
85 JsonTypeExtension::JsonBinary.into()
86 ))
87 }),
88 options: None,
89 }
90 ];
91}
92
93#[axum_macros::debug_handler]
94pub async fn loki_ingest(
95 State(log_state): State<LogState>,
96 Extension(mut ctx): Extension<QueryContext>,
97 TypedHeader(content_type): TypedHeader<ContentType>,
98 LogTableName(table_name): LogTableName,
99 bytes: Bytes,
100) -> Result<HttpResponse> {
101 ctx.set_channel(Channel::Loki);
102 let ctx = Arc::new(ctx);
103 let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
104 let db = ctx.get_db_string();
105 let db_str = db.as_str();
106 let exec_timer = Instant::now();
107
108 let mut schemas = LOKI_INIT_SCHEMAS.clone();
110
111 let mut rows = match content_type {
112 x if x == *JSON_CONTENT_TYPE => handle_json_req(bytes, &mut schemas).await,
113 x if x == *PB_CONTENT_TYPE => handle_pb_req(bytes, &mut schemas).await,
114 _ => UnsupportedContentTypeSnafu { content_type }.fail(),
115 }?;
116
117 for row in rows.iter_mut() {
119 row.resize(schemas.len(), GreptimeValue::default());
120 }
121
122 let rows = Rows {
123 rows: rows.into_iter().map(|values| Row { values }).collect(),
124 schema: schemas,
125 };
126 let ins_req = RowInsertRequest {
127 table_name,
128 rows: Some(rows),
129 };
130 let ins_reqs = RowInsertRequests {
131 inserts: vec![ins_req],
132 };
133
134 let handler = log_state.log_handler;
135 let output = handler.insert(ins_reqs, ctx).await;
136
137 if let Ok(Output {
138 data: OutputData::AffectedRows(rows),
139 meta: _,
140 }) = &output
141 {
142 METRIC_LOKI_LOGS_INGESTION_COUNTER
143 .with_label_values(&[db_str])
144 .inc_by(*rows as u64);
145 METRIC_LOKI_LOGS_INGESTION_ELAPSED
146 .with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
147 .observe(exec_timer.elapsed().as_secs_f64());
148 } else {
149 METRIC_LOKI_LOGS_INGESTION_ELAPSED
150 .with_label_values(&[db_str, METRIC_FAILURE_VALUE])
151 .observe(exec_timer.elapsed().as_secs_f64());
152 }
153
154 let response = GreptimedbV1Response::from_output(vec![output])
155 .await
156 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
157 Ok(response)
158}
159
160async fn handle_json_req(
161 bytes: Bytes,
162 schemas: &mut Vec<ColumnSchema>,
163) -> Result<Vec<Vec<GreptimeValue>>> {
164 let mut column_indexer: HashMap<String, u16> = HashMap::new();
165 column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
166 column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);
167
168 let payload: serde_json::Value =
169 serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
170
171 let streams = payload
172 .get(STREAMS_KEY)
173 .context(InvalidLokiPayloadSnafu {
174 msg: "missing streams",
175 })?
176 .as_array()
177 .context(InvalidLokiPayloadSnafu {
178 msg: "streams is not an array",
179 })?;
180
181 let mut rows = Vec::with_capacity(1000);
182
183 for (stream_index, stream) in streams.iter().enumerate() {
184 let lines = unwrap_or_warn_continue!(
187 stream.get(LINES_KEY),
188 "missing values on stream {}",
189 stream_index
190 );
191 let lines = unwrap_or_warn_continue!(
192 lines.as_array(),
193 "values is not an array on stream {}",
194 stream_index
195 );
196
197 let labels = stream
199 .get(LABEL_KEY)
200 .and_then(|label| label.as_object())
201 .map(|l| {
202 l.iter()
203 .filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string())))
204 .collect::<BTreeMap<String, String>>()
205 });
206
207 for (line_index, line) in lines.iter().enumerate() {
209 let line = unwrap_or_warn_continue!(
210 line.as_array(),
211 "missing line on stream {} index {}",
212 stream_index,
213 line_index
214 );
215 if line.len() < 2 {
216 warn!(
217 "line on stream {} index {} is too short",
218 stream_index, line_index
219 );
220 continue;
221 }
222 let ts = unwrap_or_warn_continue!(
224 line.first()
225 .and_then(|ts| ts.as_str())
226 .and_then(|ts| ts.parse::<i64>().ok()),
227 "missing or invalid timestamp on stream {} index {}",
228 stream_index,
229 line_index
230 );
231 let line_text = unwrap_or_warn_continue!(
233 line.get(1)
234 .and_then(|line| line.as_str())
235 .map(|line| line.to_string()),
236 "missing or invalid line on stream {} index {}",
237 stream_index,
238 line_index
239 );
240
241 let structured_metadata = match line.get(2) {
242 Some(sdata) if sdata.is_object() => sdata
243 .as_object()
244 .unwrap()
245 .iter()
246 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), Value::String(s.into()))))
247 .collect(),
248 _ => BTreeMap::new(),
249 };
250 let structured_metadata = Value::Object(structured_metadata);
251
252 let mut row = init_row(schemas.len(), ts, line_text, structured_metadata);
253
254 process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
255
256 rows.push(row);
257 }
258 }
259
260 Ok(rows)
261}
262
263async fn handle_pb_req(
264 bytes: Bytes,
265 schemas: &mut Vec<ColumnSchema>,
266) -> Result<Vec<Vec<GreptimeValue>>> {
267 let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
268 let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
269 .context(DecodeOtlpRequestSnafu)?;
270
271 let mut column_indexer: HashMap<String, u16> = HashMap::new();
272 column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
273 column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);
274
275 let cnt = req.streams.iter().map(|s| s.entries.len()).sum::<usize>();
276 let mut rows = Vec::with_capacity(cnt);
277
278 for stream in req.streams {
279 let labels = parse_loki_labels(&stream.labels)
280 .inspect_err(|e| {
281 error!(e; "failed to parse loki labels");
282 })
283 .ok();
284
285 for entry in stream.entries {
287 let ts = if let Some(ts) = entry.timestamp {
288 ts
289 } else {
290 continue;
291 };
292 let line = entry.line;
293
294 let structured_metadata = entry
295 .structured_metadata
296 .into_iter()
297 .map(|d| (d.name, Value::String(d.value.into())))
298 .collect::<BTreeMap<String, Value>>();
299 let structured_metadata = Value::Object(structured_metadata);
300
301 let mut row = init_row(
302 schemas.len(),
303 prost_ts_to_nano(&ts),
304 line,
305 structured_metadata,
306 );
307
308 process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
309
310 rows.push(row);
311 }
312 }
313
314 Ok(rows)
315}
316
317pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
323 let mut labels = labels.trim();
324 ensure!(
325 labels.len() >= 2,
326 InvalidLokiLabelsSnafu {
327 msg: "labels string too short"
328 }
329 );
330 ensure!(
331 labels.starts_with("{"),
332 InvalidLokiLabelsSnafu {
333 msg: "missing `{` at the beginning"
334 }
335 );
336 ensure!(
337 labels.ends_with("}"),
338 InvalidLokiLabelsSnafu {
339 msg: "missing `}` at the end"
340 }
341 );
342
343 let mut result = BTreeMap::new();
344 labels = &labels[1..labels.len() - 1];
345
346 while !labels.is_empty() {
347 let first_index = labels.find("=").with_context(|| InvalidLokiLabelsSnafu {
349 msg: format!("missing `=` near: {}", labels),
350 })?;
351 let key = &labels[..first_index];
352 labels = &labels[first_index + 1..];
353
354 let qs = quoted_string::parse::<TestSpec>(labels)
356 .map_err(|e| {
357 InvalidLokiLabelsSnafu {
358 msg: format!(
359 "failed to parse quoted string near: {}, reason: {}",
360 labels, e.1
361 ),
362 }
363 .build()
364 })?
365 .quoted_string;
366
367 labels = &labels[qs.len()..];
368
369 let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
370 InvalidLokiLabelsSnafu {
371 msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
372 }
373 .build()
374 })?;
375
376 result.insert(key.to_string(), value.to_string());
378
379 if labels.is_empty() {
380 break;
381 }
382 ensure!(
383 labels.starts_with(","),
384 InvalidLokiLabelsSnafu { msg: "missing `,`" }
385 );
386 labels = labels[1..].trim_start();
387 }
388
389 Ok(result)
390}
391
392#[inline]
393fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
394 ts.seconds * 1_000_000_000 + ts.nanos as i64
395}
396
397fn init_row(
398 schema_len: usize,
399 ts: i64,
400 line: String,
401 structured_metadata: Value,
402) -> Vec<GreptimeValue> {
403 let mut row = Vec::with_capacity(schema_len);
405 row.push(GreptimeValue {
407 value_data: Some(ValueData::TimestampNanosecondValue(ts)),
408 });
409 row.push(GreptimeValue {
410 value_data: Some(ValueData::StringValue(line)),
411 });
412 row.push(GreptimeValue {
413 value_data: Some(ValueData::BinaryValue(structured_metadata.to_vec())),
414 });
415 for _ in 0..(schema_len - 3) {
416 row.push(GreptimeValue { value_data: None });
417 }
418 row
419}
420
421fn process_labels(
422 column_indexer: &mut HashMap<String, u16>,
423 schemas: &mut Vec<ColumnSchema>,
424 row: &mut Vec<GreptimeValue>,
425 labels: Option<&BTreeMap<String, String>>,
426) {
427 let Some(labels) = labels else {
428 return;
429 };
430
431 for (k, v) in labels {
433 if let Some(index) = column_indexer.get(k) {
434 row[*index as usize] = GreptimeValue {
437 value_data: Some(ValueData::StringValue(v.clone())),
438 };
439 } else {
440 schemas.push(ColumnSchema {
443 column_name: k.clone(),
444 datatype: ColumnDataType::String.into(),
445 semantic_type: SemanticType::Tag.into(),
446 datatype_extension: None,
447 options: None,
448 });
449 column_indexer.insert(k.clone(), (schemas.len() - 1) as u16);
450
451 row.push(GreptimeValue {
452 value_data: Some(ValueData::StringValue(v.clone())),
453 });
454 }
455 }
456}
457
458#[macro_export]
459macro_rules! unwrap_or_warn_continue {
460 ($expr:expr, $msg:expr) => {
461 if let Some(value) = $expr {
462 value
463 } else {
464 warn!($msg);
465 continue;
466 }
467 };
468
469 ($expr:expr, $fmt:expr, $($arg:tt)*) => {
470 if let Some(value) = $expr {
471 value
472 } else {
473 warn!($fmt, $($arg)*);
474 continue;
475 }
476 };
477}
478
479#[cfg(test)]
480mod tests {
481 use std::collections::BTreeMap;
482
483 use loki_proto::prost_types::Timestamp;
484
485 use crate::error::Error::InvalidLokiLabels;
486 use crate::http::loki::{parse_loki_labels, prost_ts_to_nano};
487
488 #[test]
489 fn test_ts_to_nano() {
490 let ts = Timestamp {
494 seconds: 1731748568,
495 nanos: 804293888,
496 };
497 assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
498 }
499
500 #[test]
501 fn test_parse_loki_labels() {
502 let mut expected = BTreeMap::new();
503 expected.insert("job".to_string(), "foobar".to_string());
504 expected.insert("cluster".to_string(), "foo-central1".to_string());
505 expected.insert("namespace".to_string(), "bar".to_string());
506 expected.insert("container_name".to_string(), "buzz".to_string());
507
508 let valid_labels =
510 r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
511 let re = parse_loki_labels(valid_labels);
512 assert!(re.is_ok());
513 assert_eq!(re.unwrap(), expected);
514
515 let too_short = r#"}"#;
517 let re = parse_loki_labels(too_short);
518 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
519
520 let missing_start = r#"job="foobar"}"#;
522 let re = parse_loki_labels(missing_start);
523 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
524
525 let missing_end = r#"{job="foobar""#;
527 let re = parse_loki_labels(missing_end);
528 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
529
530 let missing_equal = r#"{job"foobar"}"#;
532 let re = parse_loki_labels(missing_equal);
533 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
534
535 let missing_quote = r#"{job=foobar}"#;
537 let re = parse_loki_labels(missing_quote);
538 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
539
540 let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
542 let re = parse_loki_labels(missing_comma);
543 assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
544 }
545}