servers/otlp/
logs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap as StdHashMap};
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
21    RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
22};
23use bytes::Bytes;
24use jsonb::{Number as JsonbNumber, Value as JsonbValue};
25use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
26use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue};
27use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
28use pipeline::{
29    ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
30};
31use session::context::QueryContextRef;
32use snafu::ensure;
33use vrl::prelude::NotNan;
34use vrl::value::{KeyString, Value as VrlValue};
35
36use crate::error::{
37    IncompatibleSchemaSnafu, NotSupportedSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
38};
39use crate::http::event::PipelineIngestRequest;
40use crate::otlp::trace::attributes::OtlpAnyValue;
41use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
42use crate::pipeline::run_pipeline;
43use crate::query_handler::PipelineHandlerRef;
44
45pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
46
47/// Convert OpenTelemetry metrics to GreptimeDB insert requests
48///
49/// See
50/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
51/// for data structure of OTLP metrics.
52///
53/// Returns `InsertRequests` and total number of rows to ingest
54pub async fn to_grpc_insert_requests(
55    request: ExportLogsServiceRequest,
56    pipeline: PipelineWay,
57    pipeline_params: GreptimePipelineParams,
58    table_name: String,
59    query_ctx: &QueryContextRef,
60    pipeline_handler: PipelineHandlerRef,
61) -> Result<ContextReq> {
62    match pipeline {
63        PipelineWay::OtlpLogDirect(select_info) => {
64            let rows = parse_export_logs_service_request_to_rows(request, select_info)?;
65            let insert_request = RowInsertRequest {
66                rows: Some(rows),
67                table_name,
68            };
69
70            Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
71        }
72        PipelineWay::Pipeline(pipeline_def) => {
73            let array = parse_export_logs_service_request(request);
74
75            let pipeline_ctx =
76                PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
77            run_pipeline(
78                &pipeline_handler,
79                &pipeline_ctx,
80                PipelineIngestRequest {
81                    table: table_name,
82                    values: array,
83                },
84                query_ctx,
85                true,
86            )
87            .await
88        }
89        _ => NotSupportedSnafu {
90            feat: "Unsupported pipeline for logs",
91        }
92        .fail(),
93    }
94}
95
96fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (VrlValue, VrlValue, VrlValue) {
97    scope
98        .map(|x| {
99            (
100                VrlValue::Object(key_value_to_map(x.attributes)),
101                VrlValue::Bytes(x.version.into()),
102                VrlValue::Bytes(x.name.into()),
103            )
104        })
105        .unwrap_or((VrlValue::Null, VrlValue::Null, VrlValue::Null))
106}
107
108fn scope_to_jsonb(
109    scope: Option<InstrumentationScope>,
110) -> (JsonbValue<'static>, Option<String>, Option<String>) {
111    scope
112        .map(|x| {
113            (
114                key_value_to_jsonb(x.attributes),
115                Some(x.version),
116                Some(x.name),
117            )
118        })
119        .unwrap_or((JsonbValue::Null, None, None))
120}
121
122fn log_to_pipeline_value(
123    log: LogRecord,
124    resource_schema_url: VrlValue,
125    resource_attr: VrlValue,
126    scope_schema_url: VrlValue,
127    scope_name: VrlValue,
128    scope_version: VrlValue,
129    scope_attrs: VrlValue,
130) -> VrlValue {
131    let log_attrs = VrlValue::Object(key_value_to_map(log.attributes));
132    let mut map = BTreeMap::new();
133    map.insert(
134        "Timestamp".into(),
135        VrlValue::Integer(log.time_unix_nano as i64),
136    );
137    map.insert(
138        "ObservedTimestamp".into(),
139        VrlValue::Integer(log.observed_time_unix_nano as i64),
140    );
141
142    // need to be convert to string
143    map.insert(
144        "TraceId".into(),
145        VrlValue::Bytes(bytes_to_hex_string(&log.trace_id).into()),
146    );
147    map.insert(
148        "SpanId".into(),
149        VrlValue::Bytes(bytes_to_hex_string(&log.span_id).into()),
150    );
151    map.insert("TraceFlags".into(), VrlValue::Integer(log.flags as i64));
152    map.insert(
153        "SeverityText".into(),
154        VrlValue::Bytes(log.severity_text.into()),
155    );
156    map.insert(
157        "SeverityNumber".into(),
158        VrlValue::Integer(log.severity_number as i64),
159    );
160    // need to be convert to string
161    map.insert(
162        "Body".into(),
163        log.body
164            .as_ref()
165            .map(|x| VrlValue::Bytes(log_body_to_string(x).into()))
166            .unwrap_or(VrlValue::Null),
167    );
168    map.insert("ResourceSchemaUrl".into(), resource_schema_url);
169
170    map.insert("ResourceAttributes".into(), resource_attr);
171    map.insert("ScopeSchemaUrl".into(), scope_schema_url);
172    map.insert("ScopeName".into(), scope_name);
173    map.insert("ScopeVersion".into(), scope_version);
174    map.insert("ScopeAttributes".into(), scope_attrs);
175    map.insert("LogAttributes".into(), log_attrs);
176    VrlValue::Object(map)
177}
178
179fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
180    [
181        (
182            "timestamp",
183            ColumnDataType::TimestampNanosecond,
184            SemanticType::Timestamp,
185            None,
186            None,
187        ),
188        (
189            "trace_id",
190            ColumnDataType::String,
191            SemanticType::Field,
192            None,
193            None,
194        ),
195        (
196            "span_id",
197            ColumnDataType::String,
198            SemanticType::Field,
199            None,
200            None,
201        ),
202        (
203            "severity_text",
204            ColumnDataType::String,
205            SemanticType::Field,
206            None,
207            None,
208        ),
209        (
210            "severity_number",
211            ColumnDataType::Int32,
212            SemanticType::Field,
213            None,
214            None,
215        ),
216        (
217            "body",
218            ColumnDataType::String,
219            SemanticType::Field,
220            None,
221            Some(ColumnOptions {
222                options: StdHashMap::from([(
223                    "fulltext".to_string(),
224                    r#"{"enable":true}"#.to_string(),
225                )]),
226            }),
227        ),
228        (
229            "log_attributes",
230            ColumnDataType::Binary,
231            SemanticType::Field,
232            Some(ColumnDataTypeExtension {
233                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
234            }),
235            None,
236        ),
237        (
238            "trace_flags",
239            ColumnDataType::Uint32,
240            SemanticType::Field,
241            None,
242            None,
243        ),
244        (
245            "scope_name",
246            ColumnDataType::String,
247            SemanticType::Tag,
248            None,
249            None,
250        ),
251        (
252            "scope_version",
253            ColumnDataType::String,
254            SemanticType::Field,
255            None,
256            None,
257        ),
258        (
259            "scope_attributes",
260            ColumnDataType::Binary,
261            SemanticType::Field,
262            Some(ColumnDataTypeExtension {
263                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
264            }),
265            None,
266        ),
267        (
268            "scope_schema_url",
269            ColumnDataType::String,
270            SemanticType::Field,
271            None,
272            None,
273        ),
274        (
275            "resource_attributes",
276            ColumnDataType::Binary,
277            SemanticType::Field,
278            Some(ColumnDataTypeExtension {
279                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
280            }),
281            None,
282        ),
283        (
284            "resource_schema_url",
285            ColumnDataType::String,
286            SemanticType::Field,
287            None,
288            None,
289        ),
290    ]
291    .into_iter()
292    .map(
293        |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
294            column_name: field_name.to_string(),
295            datatype: column_type as i32,
296            semantic_type: semantic_type as i32,
297            datatype_extension,
298            options,
299        },
300    )
301    .collect::<Vec<ColumnSchema>>()
302}
303
304fn build_otlp_build_in_row(
305    log: LogRecord,
306    parse_ctx: &mut ParseContext,
307) -> (Row, JsonbValue<'static>) {
308    let log_attr = key_value_to_jsonb(log.attributes);
309    let ts = if log.time_unix_nano != 0 {
310        log.time_unix_nano
311    } else {
312        log.observed_time_unix_nano
313    };
314
315    let row = vec![
316        GreptimeValue {
317            value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
318        },
319        GreptimeValue {
320            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
321        },
322        GreptimeValue {
323            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
324        },
325        GreptimeValue {
326            value_data: Some(ValueData::StringValue(log.severity_text)),
327        },
328        GreptimeValue {
329            value_data: Some(ValueData::I32Value(log.severity_number)),
330        },
331        GreptimeValue {
332            value_data: log
333                .body
334                .as_ref()
335                .map(|x| ValueData::StringValue(log_body_to_string(x))),
336        },
337        GreptimeValue {
338            value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
339        },
340        GreptimeValue {
341            value_data: Some(ValueData::U32Value(log.flags)),
342        },
343        GreptimeValue {
344            value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
345        },
346        GreptimeValue {
347            value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
348        },
349        GreptimeValue {
350            value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
351        },
352        GreptimeValue {
353            value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
354        },
355        GreptimeValue {
356            value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
357        },
358        GreptimeValue {
359            value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
360        },
361    ];
362    (Row { values: row }, log_attr)
363}
364
365fn extract_field_from_attr_and_combine_schema(
366    select_info: &SelectInfo,
367    select_schema: &mut SchemaInfo,
368    attrs: &jsonb::Value,
369) -> Result<Vec<GreptimeValue>> {
370    // note we use schema.len instead of select_keys.len
371    // because the len of the row value should always matches the len of the schema
372    let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
373
374    for key in select_info.keys.iter() {
375        let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
376            continue;
377        };
378        let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else {
379            continue;
380        };
381
382        if let Some(index) = select_schema.index.get(key) {
383            let column_schema = &select_schema.schema[*index];
384            // datatype of the same column name should be the same
385            ensure!(
386                column_schema.datatype == schema.datatype,
387                IncompatibleSchemaSnafu {
388                    column_name: key,
389                    datatype: column_schema.datatype().as_str_name(),
390                    expected: column_schema.datatype,
391                    actual: schema.datatype,
392                }
393            );
394            extracted_values[*index] = value;
395        } else {
396            select_schema.schema.push(schema);
397            select_schema
398                .index
399                .insert(key.clone(), select_schema.schema.len() - 1);
400            extracted_values.push(value);
401        }
402    }
403
404    Ok(extracted_values)
405}
406
407fn decide_column_schema_and_convert_value(
408    column_name: &str,
409    value: JsonbValue,
410) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
411    let column_info = match value {
412        JsonbValue::String(s) => Ok(Some((
413            GreptimeValue {
414                value_data: Some(ValueData::StringValue(s.into())),
415            },
416            ColumnDataType::String,
417            SemanticType::Tag,
418            None,
419        ))),
420        JsonbValue::Number(n) => match n {
421            JsonbNumber::Int64(i) => Ok(Some((
422                GreptimeValue {
423                    value_data: Some(ValueData::I64Value(i)),
424                },
425                ColumnDataType::Int64,
426                SemanticType::Tag,
427                None,
428            ))),
429            JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
430                ty: "FLOAT".to_string(),
431                key: column_name,
432            }
433            .fail(),
434            JsonbNumber::UInt64(u) => Ok(Some((
435                GreptimeValue {
436                    value_data: Some(ValueData::U64Value(u)),
437                },
438                ColumnDataType::Uint64,
439                SemanticType::Tag,
440                None,
441            ))),
442        },
443        JsonbValue::Bool(b) => Ok(Some((
444            GreptimeValue {
445                value_data: Some(ValueData::BoolValue(b)),
446            },
447            ColumnDataType::Boolean,
448            SemanticType::Tag,
449            None,
450        ))),
451        JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
452            ty: "Json".to_string(),
453            key: column_name,
454        }
455        .fail(),
456        JsonbValue::Null => Ok(None),
457    };
458    column_info.map(|c| {
459        c.map(|(value, column_type, semantic_type, datatype_extension)| {
460            (
461                ColumnSchema {
462                    column_name: column_name.to_string(),
463                    datatype: column_type as i32,
464                    semantic_type: semantic_type as i32,
465                    datatype_extension,
466                    options: None,
467                },
468                value,
469            )
470        })
471    })
472}
473
474fn parse_export_logs_service_request_to_rows(
475    request: ExportLogsServiceRequest,
476    select_info: Box<SelectInfo>,
477) -> Result<Rows> {
478    let mut schemas = build_otlp_logs_identity_schema();
479
480    let mut parse_ctx = ParseContext::new(select_info);
481    let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
482
483    schemas.extend(parse_ctx.select_schema.schema);
484
485    rows.iter_mut().for_each(|row| {
486        row.values.resize(schemas.len(), GreptimeValue::default());
487    });
488
489    Ok(Rows {
490        schema: schemas,
491        rows,
492    })
493}
494
495fn parse_resource(
496    parse_ctx: &mut ParseContext,
497    resource_logs_vec: Vec<ResourceLogs>,
498) -> Result<Vec<Row>> {
499    let total_len = resource_logs_vec
500        .iter()
501        .flat_map(|r| r.scope_logs.iter())
502        .map(|s| s.log_records.len())
503        .sum();
504
505    let mut results = Vec::with_capacity(total_len);
506
507    for r in resource_logs_vec {
508        parse_ctx.resource_attr = r
509            .resource
510            .map(|resource| key_value_to_jsonb(resource.attributes))
511            .unwrap_or(JsonbValue::Null);
512
513        parse_ctx.resource_url = r.schema_url;
514
515        parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
516            &parse_ctx.select_info,
517            &mut parse_ctx.select_schema,
518            &parse_ctx.resource_attr,
519        )?;
520
521        let rows = parse_scope(r.scope_logs, parse_ctx)?;
522        results.extend(rows);
523    }
524    Ok(results)
525}
526
527struct ParseContext<'a> {
528    // input selected keys
529    select_info: Box<SelectInfo>,
530    // schema infos for selected keys from resource/scope/log for current request
531    // since the value override from bottom to top, the max capacity is the length of the keys
532    select_schema: SchemaInfo,
533
534    // extracted and uplifted values using select keys
535    resource_uplift_values: Vec<GreptimeValue>,
536    scope_uplift_values: Vec<GreptimeValue>,
537
538    // passdown values
539    resource_url: String,
540    resource_attr: JsonbValue<'a>,
541    scope_name: Option<String>,
542    scope_version: Option<String>,
543    scope_url: String,
544    scope_attrs: JsonbValue<'a>,
545}
546
547impl<'a> ParseContext<'a> {
548    pub fn new(select_info: Box<SelectInfo>) -> ParseContext<'a> {
549        let len = select_info.keys.len();
550        ParseContext {
551            select_info,
552            select_schema: SchemaInfo::with_capacity(len),
553            resource_uplift_values: vec![],
554            scope_uplift_values: vec![],
555            resource_url: String::new(),
556            resource_attr: JsonbValue::Null,
557            scope_name: None,
558            scope_version: None,
559            scope_url: String::new(),
560            scope_attrs: JsonbValue::Null,
561        }
562    }
563}
564
565fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
566    let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
567    let mut results = Vec::with_capacity(len);
568
569    for scope_logs in scopes_log_vec {
570        let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
571        parse_ctx.scope_name = scope_name;
572        parse_ctx.scope_version = scope_version;
573        parse_ctx.scope_url = scope_logs.schema_url;
574        parse_ctx.scope_attrs = scope_attrs;
575
576        parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
577            &parse_ctx.select_info,
578            &mut parse_ctx.select_schema,
579            &parse_ctx.scope_attrs,
580        )?;
581
582        let rows = parse_log(scope_logs.log_records, parse_ctx)?;
583        results.extend(rows);
584    }
585    Ok(results)
586}
587
588fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
589    let mut result = Vec::with_capacity(log_records.len());
590
591    for log in log_records {
592        let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
593
594        let log_values = extract_field_from_attr_and_combine_schema(
595            &parse_ctx.select_info,
596            &mut parse_ctx.select_schema,
597            &log_attr,
598        )?;
599
600        let extracted_values = merge_values(
601            log_values,
602            &parse_ctx.scope_uplift_values,
603            &parse_ctx.resource_uplift_values,
604        );
605
606        row.values.extend(extracted_values);
607
608        result.push(row);
609    }
610    Ok(result)
611}
612
613fn merge_values(
614    log: Vec<GreptimeValue>,
615    scope: &[GreptimeValue],
616    resource: &[GreptimeValue],
617) -> Vec<GreptimeValue> {
618    log.into_iter()
619        .enumerate()
620        .map(|(i, value)| GreptimeValue {
621            value_data: value
622                .value_data
623                .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
624                .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
625        })
626        .collect()
627}
628
629/// transform otlp logs request to pipeline value
630/// https://opentelemetry.io/docs/concepts/signals/logs/
631fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<VrlValue> {
632    let mut result = Vec::new();
633    for r in request.resource_logs {
634        let resource_attr = r
635            .resource
636            .map(|x| VrlValue::Object(key_value_to_map(x.attributes)))
637            .unwrap_or(VrlValue::Null);
638        let resource_schema_url = VrlValue::Bytes(r.schema_url.into());
639        for scope_logs in r.scope_logs {
640            let (scope_attrs, scope_version, scope_name) =
641                scope_to_pipeline_value(scope_logs.scope);
642            let scope_schema_url = VrlValue::Bytes(scope_logs.schema_url.into());
643            for log in scope_logs.log_records {
644                let value = log_to_pipeline_value(
645                    log,
646                    resource_schema_url.clone(),
647                    resource_attr.clone(),
648                    scope_schema_url.clone(),
649                    scope_name.clone(),
650                    scope_version.clone(),
651                    scope_attrs.clone(),
652                );
653                result.push(value);
654            }
655        }
656    }
657    result
658}
659
660// convert AnyValue to pipeline value
661fn any_value_to_vrl_value(value: any_value::Value) -> VrlValue {
662    match value {
663        any_value::Value::StringValue(s) => VrlValue::Bytes(s.into()),
664        any_value::Value::IntValue(i) => VrlValue::Integer(i),
665        any_value::Value::DoubleValue(d) => VrlValue::Float(NotNan::new(d).unwrap()),
666        any_value::Value::BoolValue(b) => VrlValue::Boolean(b),
667        any_value::Value::ArrayValue(array_value) => {
668            let values = array_value
669                .values
670                .into_iter()
671                .filter_map(|v| v.value.map(any_value_to_vrl_value))
672                .collect();
673            VrlValue::Array(values)
674        }
675        any_value::Value::KvlistValue(key_value_list) => {
676            VrlValue::Object(key_value_to_map(key_value_list.values))
677        }
678        any_value::Value::BytesValue(items) => VrlValue::Bytes(Bytes::from(items)),
679    }
680}
681
682// convert otlp keyValue vec to map
683fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<KeyString, VrlValue> {
684    let mut map = BTreeMap::new();
685    for kv in key_values {
686        let value = match kv.value {
687            Some(value) => match value.value {
688                Some(value) => any_value_to_vrl_value(value),
689                None => VrlValue::Null,
690            },
691            None => VrlValue::Null,
692        };
693        map.insert(kv.key.into(), value);
694    }
695    map
696}
697
698fn log_body_to_string(body: &AnyValue) -> String {
699    let otlp_value = OtlpAnyValue::from(body);
700    otlp_value.to_string()
701}