1use std::collections::{BTreeMap, HashMap as StdHashMap};
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
21 RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
22};
23use bytes::Bytes;
24use jsonb::{Number as JsonbNumber, Value as JsonbValue};
25use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
26use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value};
27use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
28use pipeline::{
29 ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
30};
31use session::context::QueryContextRef;
32use snafu::ensure;
33use vrl::prelude::NotNan;
34use vrl::value::{KeyString, Value as VrlValue};
35
36use crate::error::{
37 IncompatibleSchemaSnafu, NotSupportedSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
38};
39use crate::http::event::PipelineIngestRequest;
40use crate::otlp::trace::attributes::OtlpAnyValue;
41use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
42use crate::pipeline::run_pipeline;
43use crate::query_handler::PipelineHandlerRef;
44
45pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
46
47pub async fn to_grpc_insert_requests(
55 request: ExportLogsServiceRequest,
56 pipeline: PipelineWay,
57 pipeline_params: GreptimePipelineParams,
58 table_name: String,
59 query_ctx: &QueryContextRef,
60 pipeline_handler: PipelineHandlerRef,
61) -> Result<ContextReq> {
62 match pipeline {
63 PipelineWay::OtlpLogDirect(select_info) => {
64 let rows = parse_export_logs_service_request_to_rows(request, select_info)?;
65 let insert_request = RowInsertRequest {
66 rows: Some(rows),
67 table_name,
68 };
69
70 Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
71 }
72 PipelineWay::Pipeline(pipeline_def) => {
73 let array = parse_export_logs_service_request(request);
74
75 let pipeline_ctx =
76 PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
77 run_pipeline(
78 &pipeline_handler,
79 &pipeline_ctx,
80 PipelineIngestRequest {
81 table: table_name,
82 values: array,
83 },
84 query_ctx,
85 true,
86 )
87 .await
88 }
89 _ => NotSupportedSnafu {
90 feat: "Unsupported pipeline for logs",
91 }
92 .fail(),
93 }
94}
95
96fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (VrlValue, VrlValue, VrlValue) {
97 scope
98 .map(|x| {
99 (
100 VrlValue::Object(key_value_to_map(x.attributes)),
101 VrlValue::Bytes(x.version.into()),
102 VrlValue::Bytes(x.name.into()),
103 )
104 })
105 .unwrap_or((VrlValue::Null, VrlValue::Null, VrlValue::Null))
106}
107
108fn scope_to_jsonb(
109 scope: Option<InstrumentationScope>,
110) -> (JsonbValue<'static>, Option<String>, Option<String>) {
111 scope
112 .map(|x| {
113 (
114 key_value_to_jsonb(x.attributes),
115 Some(x.version),
116 Some(x.name),
117 )
118 })
119 .unwrap_or((JsonbValue::Null, None, None))
120}
121
122fn log_to_pipeline_value(
123 log: LogRecord,
124 resource_schema_url: VrlValue,
125 resource_attr: VrlValue,
126 scope_schema_url: VrlValue,
127 scope_name: VrlValue,
128 scope_version: VrlValue,
129 scope_attrs: VrlValue,
130) -> VrlValue {
131 let log_attrs = VrlValue::Object(key_value_to_map(log.attributes));
132 let mut map = BTreeMap::new();
133 map.insert(
134 "Timestamp".into(),
135 VrlValue::Integer(log.time_unix_nano as i64),
136 );
137 map.insert(
138 "ObservedTimestamp".into(),
139 VrlValue::Integer(log.observed_time_unix_nano as i64),
140 );
141
142 map.insert(
144 "TraceId".into(),
145 VrlValue::Bytes(bytes_to_hex_string(&log.trace_id).into()),
146 );
147 map.insert(
148 "SpanId".into(),
149 VrlValue::Bytes(bytes_to_hex_string(&log.span_id).into()),
150 );
151 map.insert("TraceFlags".into(), VrlValue::Integer(log.flags as i64));
152 map.insert(
153 "SeverityText".into(),
154 VrlValue::Bytes(log.severity_text.into()),
155 );
156 map.insert(
157 "SeverityNumber".into(),
158 VrlValue::Integer(log.severity_number as i64),
159 );
160 map.insert(
162 "Body".into(),
163 log.body
164 .as_ref()
165 .map(|x| VrlValue::Bytes(log_body_to_string(x).into()))
166 .unwrap_or(VrlValue::Null),
167 );
168 map.insert("ResourceSchemaUrl".into(), resource_schema_url);
169
170 map.insert("ResourceAttributes".into(), resource_attr);
171 map.insert("ScopeSchemaUrl".into(), scope_schema_url);
172 map.insert("ScopeName".into(), scope_name);
173 map.insert("ScopeVersion".into(), scope_version);
174 map.insert("ScopeAttributes".into(), scope_attrs);
175 map.insert("LogAttributes".into(), log_attrs);
176 VrlValue::Object(map)
177}
178
179fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
180 [
181 (
182 "timestamp",
183 ColumnDataType::TimestampNanosecond,
184 SemanticType::Timestamp,
185 None,
186 None,
187 ),
188 (
189 "trace_id",
190 ColumnDataType::String,
191 SemanticType::Field,
192 None,
193 None,
194 ),
195 (
196 "span_id",
197 ColumnDataType::String,
198 SemanticType::Field,
199 None,
200 None,
201 ),
202 (
203 "severity_text",
204 ColumnDataType::String,
205 SemanticType::Field,
206 None,
207 None,
208 ),
209 (
210 "severity_number",
211 ColumnDataType::Int32,
212 SemanticType::Field,
213 None,
214 None,
215 ),
216 (
217 "body",
218 ColumnDataType::String,
219 SemanticType::Field,
220 None,
221 Some(ColumnOptions {
222 options: StdHashMap::from([(
223 "fulltext".to_string(),
224 r#"{"enable":true}"#.to_string(),
225 )]),
226 }),
227 ),
228 (
229 "log_attributes",
230 ColumnDataType::Binary,
231 SemanticType::Field,
232 Some(ColumnDataTypeExtension {
233 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
234 }),
235 None,
236 ),
237 (
238 "trace_flags",
239 ColumnDataType::Uint32,
240 SemanticType::Field,
241 None,
242 None,
243 ),
244 (
245 "scope_name",
246 ColumnDataType::String,
247 SemanticType::Tag,
248 None,
249 None,
250 ),
251 (
252 "scope_version",
253 ColumnDataType::String,
254 SemanticType::Field,
255 None,
256 None,
257 ),
258 (
259 "scope_attributes",
260 ColumnDataType::Binary,
261 SemanticType::Field,
262 Some(ColumnDataTypeExtension {
263 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
264 }),
265 None,
266 ),
267 (
268 "scope_schema_url",
269 ColumnDataType::String,
270 SemanticType::Field,
271 None,
272 None,
273 ),
274 (
275 "resource_attributes",
276 ColumnDataType::Binary,
277 SemanticType::Field,
278 Some(ColumnDataTypeExtension {
279 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
280 }),
281 None,
282 ),
283 (
284 "resource_schema_url",
285 ColumnDataType::String,
286 SemanticType::Field,
287 None,
288 None,
289 ),
290 ]
291 .into_iter()
292 .map(
293 |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
294 column_name: field_name.to_string(),
295 datatype: column_type as i32,
296 semantic_type: semantic_type as i32,
297 datatype_extension,
298 options,
299 },
300 )
301 .collect::<Vec<ColumnSchema>>()
302}
303
304fn build_otlp_build_in_row(
305 log: LogRecord,
306 parse_ctx: &mut ParseContext,
307) -> (Row, JsonbValue<'static>) {
308 let log_attr = key_value_to_jsonb(log.attributes);
309 let ts = if log.time_unix_nano != 0 {
310 log.time_unix_nano
311 } else {
312 log.observed_time_unix_nano
313 };
314
315 let row = vec![
316 GreptimeValue {
317 value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
318 },
319 GreptimeValue {
320 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
321 },
322 GreptimeValue {
323 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
324 },
325 GreptimeValue {
326 value_data: Some(ValueData::StringValue(log.severity_text)),
327 },
328 GreptimeValue {
329 value_data: Some(ValueData::I32Value(log.severity_number)),
330 },
331 GreptimeValue {
332 value_data: log
333 .body
334 .as_ref()
335 .map(|x| ValueData::StringValue(log_body_to_string(x))),
336 },
337 GreptimeValue {
338 value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
339 },
340 GreptimeValue {
341 value_data: Some(ValueData::U32Value(log.flags)),
342 },
343 GreptimeValue {
344 value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
345 },
346 GreptimeValue {
347 value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
348 },
349 GreptimeValue {
350 value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
351 },
352 GreptimeValue {
353 value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
354 },
355 GreptimeValue {
356 value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
357 },
358 GreptimeValue {
359 value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
360 },
361 ];
362 (Row { values: row }, log_attr)
363}
364
365fn extract_field_from_attr_and_combine_schema(
366 select_info: &SelectInfo,
367 select_schema: &mut SchemaInfo,
368 attrs: &jsonb::Value,
369) -> Result<Vec<GreptimeValue>> {
370 let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
373
374 for key in select_info.keys.iter() {
375 let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
376 continue;
377 };
378 let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else {
379 continue;
380 };
381
382 if let Some(index) = select_schema.index.get(key) {
383 let column_schema = &select_schema.schema[*index];
384 let column_schema: ColumnSchema = column_schema.clone().try_into()?;
385 ensure!(
387 column_schema.datatype == schema.datatype,
388 IncompatibleSchemaSnafu {
389 column_name: key,
390 datatype: column_schema.datatype().as_str_name(),
391 expected: column_schema.datatype,
392 actual: schema.datatype,
393 }
394 );
395 extracted_values[*index] = value;
396 } else {
397 select_schema.schema.push(schema.into());
398 select_schema
399 .index
400 .insert(key.clone(), select_schema.schema.len() - 1);
401 extracted_values.push(value);
402 }
403 }
404
405 Ok(extracted_values)
406}
407
408fn decide_column_schema_and_convert_value(
409 column_name: &str,
410 value: JsonbValue,
411) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
412 let column_info = match value {
413 JsonbValue::String(s) => Ok(Some((
414 GreptimeValue {
415 value_data: Some(ValueData::StringValue(s.into())),
416 },
417 ColumnDataType::String,
418 SemanticType::Tag,
419 None,
420 ))),
421 JsonbValue::Number(n) => match n {
422 JsonbNumber::Int64(i) => Ok(Some((
423 GreptimeValue {
424 value_data: Some(ValueData::I64Value(i)),
425 },
426 ColumnDataType::Int64,
427 SemanticType::Tag,
428 None,
429 ))),
430 JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
431 ty: "FLOAT".to_string(),
432 key: column_name,
433 }
434 .fail(),
435 JsonbNumber::UInt64(u) => Ok(Some((
436 GreptimeValue {
437 value_data: Some(ValueData::U64Value(u)),
438 },
439 ColumnDataType::Uint64,
440 SemanticType::Tag,
441 None,
442 ))),
443 },
444 JsonbValue::Bool(b) => Ok(Some((
445 GreptimeValue {
446 value_data: Some(ValueData::BoolValue(b)),
447 },
448 ColumnDataType::Boolean,
449 SemanticType::Tag,
450 None,
451 ))),
452 JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
453 ty: "Json".to_string(),
454 key: column_name,
455 }
456 .fail(),
457 JsonbValue::Null => Ok(None),
458 };
459 column_info.map(|c| {
460 c.map(|(value, column_type, semantic_type, datatype_extension)| {
461 (
462 ColumnSchema {
463 column_name: column_name.to_string(),
464 datatype: column_type as i32,
465 semantic_type: semantic_type as i32,
466 datatype_extension,
467 options: None,
468 },
469 value,
470 )
471 })
472 })
473}
474
475fn parse_export_logs_service_request_to_rows(
476 request: ExportLogsServiceRequest,
477 select_info: Box<SelectInfo>,
478) -> Result<Rows> {
479 let mut schemas = build_otlp_logs_identity_schema();
480
481 let mut parse_ctx = ParseContext::new(select_info);
482 let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
483
484 schemas.extend(parse_ctx.select_schema.column_schemas()?);
485
486 rows.iter_mut().for_each(|row| {
487 row.values.resize(schemas.len(), GreptimeValue::default());
488 });
489
490 Ok(Rows {
491 schema: schemas,
492 rows,
493 })
494}
495
496fn parse_resource(
497 parse_ctx: &mut ParseContext,
498 resource_logs_vec: Vec<ResourceLogs>,
499) -> Result<Vec<Row>> {
500 let total_len = resource_logs_vec
501 .iter()
502 .flat_map(|r| r.scope_logs.iter())
503 .map(|s| s.log_records.len())
504 .sum();
505
506 let mut results = Vec::with_capacity(total_len);
507
508 for r in resource_logs_vec {
509 parse_ctx.resource_attr = r
510 .resource
511 .map(|resource| key_value_to_jsonb(resource.attributes))
512 .unwrap_or(JsonbValue::Null);
513
514 parse_ctx.resource_url = r.schema_url;
515
516 parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
517 &parse_ctx.select_info,
518 &mut parse_ctx.select_schema,
519 &parse_ctx.resource_attr,
520 )?;
521
522 let rows = parse_scope(r.scope_logs, parse_ctx)?;
523 results.extend(rows);
524 }
525 Ok(results)
526}
527
528struct ParseContext<'a> {
529 select_info: Box<SelectInfo>,
531 select_schema: SchemaInfo,
534
535 resource_uplift_values: Vec<GreptimeValue>,
537 scope_uplift_values: Vec<GreptimeValue>,
538
539 resource_url: String,
541 resource_attr: JsonbValue<'a>,
542 scope_name: Option<String>,
543 scope_version: Option<String>,
544 scope_url: String,
545 scope_attrs: JsonbValue<'a>,
546}
547
548impl<'a> ParseContext<'a> {
549 pub fn new(select_info: Box<SelectInfo>) -> ParseContext<'a> {
550 let len = select_info.keys.len();
551 ParseContext {
552 select_info,
553 select_schema: SchemaInfo::with_capacity(len),
554 resource_uplift_values: vec![],
555 scope_uplift_values: vec![],
556 resource_url: String::new(),
557 resource_attr: JsonbValue::Null,
558 scope_name: None,
559 scope_version: None,
560 scope_url: String::new(),
561 scope_attrs: JsonbValue::Null,
562 }
563 }
564}
565
566fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
567 let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
568 let mut results = Vec::with_capacity(len);
569
570 for scope_logs in scopes_log_vec {
571 let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
572 parse_ctx.scope_name = scope_name;
573 parse_ctx.scope_version = scope_version;
574 parse_ctx.scope_url = scope_logs.schema_url;
575 parse_ctx.scope_attrs = scope_attrs;
576
577 parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
578 &parse_ctx.select_info,
579 &mut parse_ctx.select_schema,
580 &parse_ctx.scope_attrs,
581 )?;
582
583 let rows = parse_log(scope_logs.log_records, parse_ctx)?;
584 results.extend(rows);
585 }
586 Ok(results)
587}
588
589fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
590 let mut result = Vec::with_capacity(log_records.len());
591
592 for log in log_records {
593 let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
594
595 let log_values = extract_field_from_attr_and_combine_schema(
596 &parse_ctx.select_info,
597 &mut parse_ctx.select_schema,
598 &log_attr,
599 )?;
600
601 let extracted_values = merge_values(
602 log_values,
603 &parse_ctx.scope_uplift_values,
604 &parse_ctx.resource_uplift_values,
605 );
606
607 row.values.extend(extracted_values);
608
609 result.push(row);
610 }
611 Ok(result)
612}
613
614fn merge_values(
615 log: Vec<GreptimeValue>,
616 scope: &[GreptimeValue],
617 resource: &[GreptimeValue],
618) -> Vec<GreptimeValue> {
619 log.into_iter()
620 .enumerate()
621 .map(|(i, value)| GreptimeValue {
622 value_data: value
623 .value_data
624 .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
625 .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
626 })
627 .collect()
628}
629
630fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<VrlValue> {
633 let mut result = Vec::new();
634 for r in request.resource_logs {
635 let resource_attr = r
636 .resource
637 .map(|x| VrlValue::Object(key_value_to_map(x.attributes)))
638 .unwrap_or(VrlValue::Null);
639 let resource_schema_url = VrlValue::Bytes(r.schema_url.into());
640 for scope_logs in r.scope_logs {
641 let (scope_attrs, scope_version, scope_name) =
642 scope_to_pipeline_value(scope_logs.scope);
643 let scope_schema_url = VrlValue::Bytes(scope_logs.schema_url.into());
644 for log in scope_logs.log_records {
645 let value = log_to_pipeline_value(
646 log,
647 resource_schema_url.clone(),
648 resource_attr.clone(),
649 scope_schema_url.clone(),
650 scope_name.clone(),
651 scope_version.clone(),
652 scope_attrs.clone(),
653 );
654 result.push(value);
655 }
656 }
657 }
658 result
659}
660
661fn any_value_to_vrl_value(value: any_value::Value) -> VrlValue {
663 match value {
664 any_value::Value::StringValue(s) => VrlValue::Bytes(s.into()),
665 any_value::Value::IntValue(i) => VrlValue::Integer(i),
666 any_value::Value::DoubleValue(d) => VrlValue::Float(NotNan::new(d).unwrap()),
667 any_value::Value::BoolValue(b) => VrlValue::Boolean(b),
668 any_value::Value::ArrayValue(array_value) => {
669 let values = array_value
670 .values
671 .into_iter()
672 .filter_map(|v| v.value.map(any_value_to_vrl_value))
673 .collect();
674 VrlValue::Array(values)
675 }
676 any_value::Value::KvlistValue(key_value_list) => {
677 VrlValue::Object(key_value_to_map(key_value_list.values))
678 }
679 any_value::Value::BytesValue(items) => VrlValue::Bytes(Bytes::from(items)),
680 }
681}
682
683fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<KeyString, VrlValue> {
685 let mut map = BTreeMap::new();
686 for kv in key_values {
687 let value = match kv.value {
688 Some(value) => match value.value {
689 Some(value) => any_value_to_vrl_value(value),
690 None => VrlValue::Null,
691 },
692 None => VrlValue::Null,
693 };
694 map.insert(kv.key.into(), value);
695 }
696 map
697}
698
699fn log_body_to_string(body: &AnyValue) -> String {
700 let otlp_value = OtlpAnyValue::from(body);
701 otlp_value.to_string()
702}