1use std::collections::{BTreeMap, HashMap as StdHashMap};
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
21 RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
22};
23use bytes::Bytes;
24use jsonb::{Number as JsonbNumber, Value as JsonbValue};
25use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
26use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue};
27use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
28use pipeline::{
29 ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
30};
31use session::context::QueryContextRef;
32use snafu::ensure;
33use vrl::prelude::NotNan;
34use vrl::value::{KeyString, Value as VrlValue};
35
36use crate::error::{
37 IncompatibleSchemaSnafu, NotSupportedSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
38};
39use crate::http::event::PipelineIngestRequest;
40use crate::otlp::trace::attributes::OtlpAnyValue;
41use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
42use crate::pipeline::run_pipeline;
43use crate::query_handler::PipelineHandlerRef;
44
45pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
46
47pub async fn to_grpc_insert_requests(
55 request: ExportLogsServiceRequest,
56 pipeline: PipelineWay,
57 pipeline_params: GreptimePipelineParams,
58 table_name: String,
59 query_ctx: &QueryContextRef,
60 pipeline_handler: PipelineHandlerRef,
61) -> Result<ContextReq> {
62 match pipeline {
63 PipelineWay::OtlpLogDirect(select_info) => {
64 let rows = parse_export_logs_service_request_to_rows(request, select_info)?;
65 let insert_request = RowInsertRequest {
66 rows: Some(rows),
67 table_name,
68 };
69
70 Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
71 }
72 PipelineWay::Pipeline(pipeline_def) => {
73 let array = parse_export_logs_service_request(request);
74
75 let pipeline_ctx =
76 PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
77 run_pipeline(
78 &pipeline_handler,
79 &pipeline_ctx,
80 PipelineIngestRequest {
81 table: table_name,
82 values: array,
83 },
84 query_ctx,
85 true,
86 )
87 .await
88 }
89 _ => NotSupportedSnafu {
90 feat: "Unsupported pipeline for logs",
91 }
92 .fail(),
93 }
94}
95
96fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (VrlValue, VrlValue, VrlValue) {
97 scope
98 .map(|x| {
99 (
100 VrlValue::Object(key_value_to_map(x.attributes)),
101 VrlValue::Bytes(x.version.into()),
102 VrlValue::Bytes(x.name.into()),
103 )
104 })
105 .unwrap_or((VrlValue::Null, VrlValue::Null, VrlValue::Null))
106}
107
108fn scope_to_jsonb(
109 scope: Option<InstrumentationScope>,
110) -> (JsonbValue<'static>, Option<String>, Option<String>) {
111 scope
112 .map(|x| {
113 (
114 key_value_to_jsonb(x.attributes),
115 Some(x.version),
116 Some(x.name),
117 )
118 })
119 .unwrap_or((JsonbValue::Null, None, None))
120}
121
122fn log_to_pipeline_value(
123 log: LogRecord,
124 resource_schema_url: VrlValue,
125 resource_attr: VrlValue,
126 scope_schema_url: VrlValue,
127 scope_name: VrlValue,
128 scope_version: VrlValue,
129 scope_attrs: VrlValue,
130) -> VrlValue {
131 let log_attrs = VrlValue::Object(key_value_to_map(log.attributes));
132 let mut map = BTreeMap::new();
133 map.insert(
134 "Timestamp".into(),
135 VrlValue::Integer(log.time_unix_nano as i64),
136 );
137 map.insert(
138 "ObservedTimestamp".into(),
139 VrlValue::Integer(log.observed_time_unix_nano as i64),
140 );
141
142 map.insert(
144 "TraceId".into(),
145 VrlValue::Bytes(bytes_to_hex_string(&log.trace_id).into()),
146 );
147 map.insert(
148 "SpanId".into(),
149 VrlValue::Bytes(bytes_to_hex_string(&log.span_id).into()),
150 );
151 map.insert("TraceFlags".into(), VrlValue::Integer(log.flags as i64));
152 map.insert(
153 "SeverityText".into(),
154 VrlValue::Bytes(log.severity_text.into()),
155 );
156 map.insert(
157 "SeverityNumber".into(),
158 VrlValue::Integer(log.severity_number as i64),
159 );
160 map.insert(
162 "Body".into(),
163 log.body
164 .as_ref()
165 .map(|x| VrlValue::Bytes(log_body_to_string(x).into()))
166 .unwrap_or(VrlValue::Null),
167 );
168 map.insert("ResourceSchemaUrl".into(), resource_schema_url);
169
170 map.insert("ResourceAttributes".into(), resource_attr);
171 map.insert("ScopeSchemaUrl".into(), scope_schema_url);
172 map.insert("ScopeName".into(), scope_name);
173 map.insert("ScopeVersion".into(), scope_version);
174 map.insert("ScopeAttributes".into(), scope_attrs);
175 map.insert("LogAttributes".into(), log_attrs);
176 VrlValue::Object(map)
177}
178
179fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
180 [
181 (
182 "timestamp",
183 ColumnDataType::TimestampNanosecond,
184 SemanticType::Timestamp,
185 None,
186 None,
187 ),
188 (
189 "trace_id",
190 ColumnDataType::String,
191 SemanticType::Field,
192 None,
193 None,
194 ),
195 (
196 "span_id",
197 ColumnDataType::String,
198 SemanticType::Field,
199 None,
200 None,
201 ),
202 (
203 "severity_text",
204 ColumnDataType::String,
205 SemanticType::Field,
206 None,
207 None,
208 ),
209 (
210 "severity_number",
211 ColumnDataType::Int32,
212 SemanticType::Field,
213 None,
214 None,
215 ),
216 (
217 "body",
218 ColumnDataType::String,
219 SemanticType::Field,
220 None,
221 Some(ColumnOptions {
222 options: StdHashMap::from([(
223 "fulltext".to_string(),
224 r#"{"enable":true}"#.to_string(),
225 )]),
226 }),
227 ),
228 (
229 "log_attributes",
230 ColumnDataType::Binary,
231 SemanticType::Field,
232 Some(ColumnDataTypeExtension {
233 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
234 }),
235 None,
236 ),
237 (
238 "trace_flags",
239 ColumnDataType::Uint32,
240 SemanticType::Field,
241 None,
242 None,
243 ),
244 (
245 "scope_name",
246 ColumnDataType::String,
247 SemanticType::Tag,
248 None,
249 None,
250 ),
251 (
252 "scope_version",
253 ColumnDataType::String,
254 SemanticType::Field,
255 None,
256 None,
257 ),
258 (
259 "scope_attributes",
260 ColumnDataType::Binary,
261 SemanticType::Field,
262 Some(ColumnDataTypeExtension {
263 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
264 }),
265 None,
266 ),
267 (
268 "scope_schema_url",
269 ColumnDataType::String,
270 SemanticType::Field,
271 None,
272 None,
273 ),
274 (
275 "resource_attributes",
276 ColumnDataType::Binary,
277 SemanticType::Field,
278 Some(ColumnDataTypeExtension {
279 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
280 }),
281 None,
282 ),
283 (
284 "resource_schema_url",
285 ColumnDataType::String,
286 SemanticType::Field,
287 None,
288 None,
289 ),
290 ]
291 .into_iter()
292 .map(
293 |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
294 column_name: field_name.to_string(),
295 datatype: column_type as i32,
296 semantic_type: semantic_type as i32,
297 datatype_extension,
298 options,
299 },
300 )
301 .collect::<Vec<ColumnSchema>>()
302}
303
304fn build_otlp_build_in_row(
305 log: LogRecord,
306 parse_ctx: &mut ParseContext,
307) -> (Row, JsonbValue<'static>) {
308 let log_attr = key_value_to_jsonb(log.attributes);
309 let ts = if log.time_unix_nano != 0 {
310 log.time_unix_nano
311 } else {
312 log.observed_time_unix_nano
313 };
314
315 let row = vec![
316 GreptimeValue {
317 value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
318 },
319 GreptimeValue {
320 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
321 },
322 GreptimeValue {
323 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
324 },
325 GreptimeValue {
326 value_data: Some(ValueData::StringValue(log.severity_text)),
327 },
328 GreptimeValue {
329 value_data: Some(ValueData::I32Value(log.severity_number)),
330 },
331 GreptimeValue {
332 value_data: log
333 .body
334 .as_ref()
335 .map(|x| ValueData::StringValue(log_body_to_string(x))),
336 },
337 GreptimeValue {
338 value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
339 },
340 GreptimeValue {
341 value_data: Some(ValueData::U32Value(log.flags)),
342 },
343 GreptimeValue {
344 value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
345 },
346 GreptimeValue {
347 value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
348 },
349 GreptimeValue {
350 value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
351 },
352 GreptimeValue {
353 value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
354 },
355 GreptimeValue {
356 value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
357 },
358 GreptimeValue {
359 value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
360 },
361 ];
362 (Row { values: row }, log_attr)
363}
364
365fn extract_field_from_attr_and_combine_schema(
366 select_info: &SelectInfo,
367 select_schema: &mut SchemaInfo,
368 attrs: &jsonb::Value,
369) -> Result<Vec<GreptimeValue>> {
370 let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
373
374 for key in select_info.keys.iter() {
375 let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
376 continue;
377 };
378 let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else {
379 continue;
380 };
381
382 if let Some(index) = select_schema.index.get(key) {
383 let column_schema = &select_schema.schema[*index];
384 ensure!(
386 column_schema.datatype == schema.datatype,
387 IncompatibleSchemaSnafu {
388 column_name: key,
389 datatype: column_schema.datatype().as_str_name(),
390 expected: column_schema.datatype,
391 actual: schema.datatype,
392 }
393 );
394 extracted_values[*index] = value;
395 } else {
396 select_schema.schema.push(schema);
397 select_schema
398 .index
399 .insert(key.clone(), select_schema.schema.len() - 1);
400 extracted_values.push(value);
401 }
402 }
403
404 Ok(extracted_values)
405}
406
407fn decide_column_schema_and_convert_value(
408 column_name: &str,
409 value: JsonbValue,
410) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
411 let column_info = match value {
412 JsonbValue::String(s) => Ok(Some((
413 GreptimeValue {
414 value_data: Some(ValueData::StringValue(s.into())),
415 },
416 ColumnDataType::String,
417 SemanticType::Tag,
418 None,
419 ))),
420 JsonbValue::Number(n) => match n {
421 JsonbNumber::Int64(i) => Ok(Some((
422 GreptimeValue {
423 value_data: Some(ValueData::I64Value(i)),
424 },
425 ColumnDataType::Int64,
426 SemanticType::Tag,
427 None,
428 ))),
429 JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
430 ty: "FLOAT".to_string(),
431 key: column_name,
432 }
433 .fail(),
434 JsonbNumber::UInt64(u) => Ok(Some((
435 GreptimeValue {
436 value_data: Some(ValueData::U64Value(u)),
437 },
438 ColumnDataType::Uint64,
439 SemanticType::Tag,
440 None,
441 ))),
442 },
443 JsonbValue::Bool(b) => Ok(Some((
444 GreptimeValue {
445 value_data: Some(ValueData::BoolValue(b)),
446 },
447 ColumnDataType::Boolean,
448 SemanticType::Tag,
449 None,
450 ))),
451 JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
452 ty: "Json".to_string(),
453 key: column_name,
454 }
455 .fail(),
456 JsonbValue::Null => Ok(None),
457 };
458 column_info.map(|c| {
459 c.map(|(value, column_type, semantic_type, datatype_extension)| {
460 (
461 ColumnSchema {
462 column_name: column_name.to_string(),
463 datatype: column_type as i32,
464 semantic_type: semantic_type as i32,
465 datatype_extension,
466 options: None,
467 },
468 value,
469 )
470 })
471 })
472}
473
474fn parse_export_logs_service_request_to_rows(
475 request: ExportLogsServiceRequest,
476 select_info: Box<SelectInfo>,
477) -> Result<Rows> {
478 let mut schemas = build_otlp_logs_identity_schema();
479
480 let mut parse_ctx = ParseContext::new(select_info);
481 let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
482
483 schemas.extend(parse_ctx.select_schema.schema);
484
485 rows.iter_mut().for_each(|row| {
486 row.values.resize(schemas.len(), GreptimeValue::default());
487 });
488
489 Ok(Rows {
490 schema: schemas,
491 rows,
492 })
493}
494
495fn parse_resource(
496 parse_ctx: &mut ParseContext,
497 resource_logs_vec: Vec<ResourceLogs>,
498) -> Result<Vec<Row>> {
499 let total_len = resource_logs_vec
500 .iter()
501 .flat_map(|r| r.scope_logs.iter())
502 .map(|s| s.log_records.len())
503 .sum();
504
505 let mut results = Vec::with_capacity(total_len);
506
507 for r in resource_logs_vec {
508 parse_ctx.resource_attr = r
509 .resource
510 .map(|resource| key_value_to_jsonb(resource.attributes))
511 .unwrap_or(JsonbValue::Null);
512
513 parse_ctx.resource_url = r.schema_url;
514
515 parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
516 &parse_ctx.select_info,
517 &mut parse_ctx.select_schema,
518 &parse_ctx.resource_attr,
519 )?;
520
521 let rows = parse_scope(r.scope_logs, parse_ctx)?;
522 results.extend(rows);
523 }
524 Ok(results)
525}
526
527struct ParseContext<'a> {
528 select_info: Box<SelectInfo>,
530 select_schema: SchemaInfo,
533
534 resource_uplift_values: Vec<GreptimeValue>,
536 scope_uplift_values: Vec<GreptimeValue>,
537
538 resource_url: String,
540 resource_attr: JsonbValue<'a>,
541 scope_name: Option<String>,
542 scope_version: Option<String>,
543 scope_url: String,
544 scope_attrs: JsonbValue<'a>,
545}
546
547impl<'a> ParseContext<'a> {
548 pub fn new(select_info: Box<SelectInfo>) -> ParseContext<'a> {
549 let len = select_info.keys.len();
550 ParseContext {
551 select_info,
552 select_schema: SchemaInfo::with_capacity(len),
553 resource_uplift_values: vec![],
554 scope_uplift_values: vec![],
555 resource_url: String::new(),
556 resource_attr: JsonbValue::Null,
557 scope_name: None,
558 scope_version: None,
559 scope_url: String::new(),
560 scope_attrs: JsonbValue::Null,
561 }
562 }
563}
564
565fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
566 let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
567 let mut results = Vec::with_capacity(len);
568
569 for scope_logs in scopes_log_vec {
570 let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
571 parse_ctx.scope_name = scope_name;
572 parse_ctx.scope_version = scope_version;
573 parse_ctx.scope_url = scope_logs.schema_url;
574 parse_ctx.scope_attrs = scope_attrs;
575
576 parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
577 &parse_ctx.select_info,
578 &mut parse_ctx.select_schema,
579 &parse_ctx.scope_attrs,
580 )?;
581
582 let rows = parse_log(scope_logs.log_records, parse_ctx)?;
583 results.extend(rows);
584 }
585 Ok(results)
586}
587
588fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
589 let mut result = Vec::with_capacity(log_records.len());
590
591 for log in log_records {
592 let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
593
594 let log_values = extract_field_from_attr_and_combine_schema(
595 &parse_ctx.select_info,
596 &mut parse_ctx.select_schema,
597 &log_attr,
598 )?;
599
600 let extracted_values = merge_values(
601 log_values,
602 &parse_ctx.scope_uplift_values,
603 &parse_ctx.resource_uplift_values,
604 );
605
606 row.values.extend(extracted_values);
607
608 result.push(row);
609 }
610 Ok(result)
611}
612
613fn merge_values(
614 log: Vec<GreptimeValue>,
615 scope: &[GreptimeValue],
616 resource: &[GreptimeValue],
617) -> Vec<GreptimeValue> {
618 log.into_iter()
619 .enumerate()
620 .map(|(i, value)| GreptimeValue {
621 value_data: value
622 .value_data
623 .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
624 .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
625 })
626 .collect()
627}
628
629fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<VrlValue> {
632 let mut result = Vec::new();
633 for r in request.resource_logs {
634 let resource_attr = r
635 .resource
636 .map(|x| VrlValue::Object(key_value_to_map(x.attributes)))
637 .unwrap_or(VrlValue::Null);
638 let resource_schema_url = VrlValue::Bytes(r.schema_url.into());
639 for scope_logs in r.scope_logs {
640 let (scope_attrs, scope_version, scope_name) =
641 scope_to_pipeline_value(scope_logs.scope);
642 let scope_schema_url = VrlValue::Bytes(scope_logs.schema_url.into());
643 for log in scope_logs.log_records {
644 let value = log_to_pipeline_value(
645 log,
646 resource_schema_url.clone(),
647 resource_attr.clone(),
648 scope_schema_url.clone(),
649 scope_name.clone(),
650 scope_version.clone(),
651 scope_attrs.clone(),
652 );
653 result.push(value);
654 }
655 }
656 }
657 result
658}
659
660fn any_value_to_vrl_value(value: any_value::Value) -> VrlValue {
662 match value {
663 any_value::Value::StringValue(s) => VrlValue::Bytes(s.into()),
664 any_value::Value::IntValue(i) => VrlValue::Integer(i),
665 any_value::Value::DoubleValue(d) => VrlValue::Float(NotNan::new(d).unwrap()),
666 any_value::Value::BoolValue(b) => VrlValue::Boolean(b),
667 any_value::Value::ArrayValue(array_value) => {
668 let values = array_value
669 .values
670 .into_iter()
671 .filter_map(|v| v.value.map(any_value_to_vrl_value))
672 .collect();
673 VrlValue::Array(values)
674 }
675 any_value::Value::KvlistValue(key_value_list) => {
676 VrlValue::Object(key_value_to_map(key_value_list.values))
677 }
678 any_value::Value::BytesValue(items) => VrlValue::Bytes(Bytes::from(items)),
679 }
680}
681
682fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<KeyString, VrlValue> {
684 let mut map = BTreeMap::new();
685 for kv in key_values {
686 let value = match kv.value {
687 Some(value) => match value.value {
688 Some(value) => any_value_to_vrl_value(value),
689 None => VrlValue::Null,
690 },
691 None => VrlValue::Null,
692 };
693 map.insert(kv.key.into(), value);
694 }
695 map
696}
697
698fn log_body_to_string(body: &AnyValue) -> String {
699 let otlp_value = OtlpAnyValue::from(body);
700 otlp_value.to_string()
701}