servers/otlp/
logs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap as StdHashMap};
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
21    RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
22};
23use bytes::Bytes;
24use jsonb::{Number as JsonbNumber, Value as JsonbValue};
25use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
26use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value};
27use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
28use pipeline::{
29    ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
30};
31use session::context::QueryContextRef;
32use snafu::ensure;
33use vrl::prelude::NotNan;
34use vrl::value::{KeyString, Value as VrlValue};
35
36use crate::error::{
37    IncompatibleSchemaSnafu, NotSupportedSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
38};
39use crate::http::event::PipelineIngestRequest;
40use crate::otlp::trace::attributes::OtlpAnyValue;
41use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
42use crate::pipeline::run_pipeline;
43use crate::query_handler::PipelineHandlerRef;
44
45pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
46
47/// Convert OpenTelemetry metrics to GreptimeDB insert requests
48///
49/// See
50/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
51/// for data structure of OTLP metrics.
52///
53/// Returns `InsertRequests` and total number of rows to ingest
54pub async fn to_grpc_insert_requests(
55    request: ExportLogsServiceRequest,
56    pipeline: PipelineWay,
57    pipeline_params: GreptimePipelineParams,
58    table_name: String,
59    query_ctx: &QueryContextRef,
60    pipeline_handler: PipelineHandlerRef,
61) -> Result<ContextReq> {
62    match pipeline {
63        PipelineWay::OtlpLogDirect(select_info) => {
64            let rows = parse_export_logs_service_request_to_rows(request, select_info)?;
65            let insert_request = RowInsertRequest {
66                rows: Some(rows),
67                table_name,
68            };
69
70            Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
71        }
72        PipelineWay::Pipeline(pipeline_def) => {
73            let array = parse_export_logs_service_request(request);
74
75            let pipeline_ctx =
76                PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
77            run_pipeline(
78                &pipeline_handler,
79                &pipeline_ctx,
80                PipelineIngestRequest {
81                    table: table_name,
82                    values: array,
83                },
84                query_ctx,
85                true,
86            )
87            .await
88        }
89        _ => NotSupportedSnafu {
90            feat: "Unsupported pipeline for logs",
91        }
92        .fail(),
93    }
94}
95
96fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (VrlValue, VrlValue, VrlValue) {
97    scope
98        .map(|x| {
99            (
100                VrlValue::Object(key_value_to_map(x.attributes)),
101                VrlValue::Bytes(x.version.into()),
102                VrlValue::Bytes(x.name.into()),
103            )
104        })
105        .unwrap_or((VrlValue::Null, VrlValue::Null, VrlValue::Null))
106}
107
108fn scope_to_jsonb(
109    scope: Option<InstrumentationScope>,
110) -> (JsonbValue<'static>, Option<String>, Option<String>) {
111    scope
112        .map(|x| {
113            (
114                key_value_to_jsonb(x.attributes),
115                Some(x.version),
116                Some(x.name),
117            )
118        })
119        .unwrap_or((JsonbValue::Null, None, None))
120}
121
122fn log_to_pipeline_value(
123    log: LogRecord,
124    resource_schema_url: VrlValue,
125    resource_attr: VrlValue,
126    scope_schema_url: VrlValue,
127    scope_name: VrlValue,
128    scope_version: VrlValue,
129    scope_attrs: VrlValue,
130) -> VrlValue {
131    let log_attrs = VrlValue::Object(key_value_to_map(log.attributes));
132    let mut map = BTreeMap::new();
133    map.insert(
134        "Timestamp".into(),
135        VrlValue::Integer(log.time_unix_nano as i64),
136    );
137    map.insert(
138        "ObservedTimestamp".into(),
139        VrlValue::Integer(log.observed_time_unix_nano as i64),
140    );
141
142    // need to be convert to string
143    map.insert(
144        "TraceId".into(),
145        VrlValue::Bytes(bytes_to_hex_string(&log.trace_id).into()),
146    );
147    map.insert(
148        "SpanId".into(),
149        VrlValue::Bytes(bytes_to_hex_string(&log.span_id).into()),
150    );
151    map.insert("TraceFlags".into(), VrlValue::Integer(log.flags as i64));
152    map.insert(
153        "SeverityText".into(),
154        VrlValue::Bytes(log.severity_text.into()),
155    );
156    map.insert(
157        "SeverityNumber".into(),
158        VrlValue::Integer(log.severity_number as i64),
159    );
160    // need to be convert to string
161    map.insert(
162        "Body".into(),
163        log.body
164            .as_ref()
165            .map(|x| VrlValue::Bytes(log_body_to_string(x).into()))
166            .unwrap_or(VrlValue::Null),
167    );
168    map.insert("ResourceSchemaUrl".into(), resource_schema_url);
169
170    map.insert("ResourceAttributes".into(), resource_attr);
171    map.insert("ScopeSchemaUrl".into(), scope_schema_url);
172    map.insert("ScopeName".into(), scope_name);
173    map.insert("ScopeVersion".into(), scope_version);
174    map.insert("ScopeAttributes".into(), scope_attrs);
175    map.insert("LogAttributes".into(), log_attrs);
176    VrlValue::Object(map)
177}
178
179fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
180    [
181        (
182            "timestamp",
183            ColumnDataType::TimestampNanosecond,
184            SemanticType::Timestamp,
185            None,
186            None,
187        ),
188        (
189            "trace_id",
190            ColumnDataType::String,
191            SemanticType::Field,
192            None,
193            None,
194        ),
195        (
196            "span_id",
197            ColumnDataType::String,
198            SemanticType::Field,
199            None,
200            None,
201        ),
202        (
203            "severity_text",
204            ColumnDataType::String,
205            SemanticType::Field,
206            None,
207            None,
208        ),
209        (
210            "severity_number",
211            ColumnDataType::Int32,
212            SemanticType::Field,
213            None,
214            None,
215        ),
216        (
217            "body",
218            ColumnDataType::String,
219            SemanticType::Field,
220            None,
221            Some(ColumnOptions {
222                options: StdHashMap::from([(
223                    "fulltext".to_string(),
224                    r#"{"enable":true}"#.to_string(),
225                )]),
226            }),
227        ),
228        (
229            "log_attributes",
230            ColumnDataType::Binary,
231            SemanticType::Field,
232            Some(ColumnDataTypeExtension {
233                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
234            }),
235            None,
236        ),
237        (
238            "trace_flags",
239            ColumnDataType::Uint32,
240            SemanticType::Field,
241            None,
242            None,
243        ),
244        (
245            "scope_name",
246            ColumnDataType::String,
247            SemanticType::Tag,
248            None,
249            None,
250        ),
251        (
252            "scope_version",
253            ColumnDataType::String,
254            SemanticType::Field,
255            None,
256            None,
257        ),
258        (
259            "scope_attributes",
260            ColumnDataType::Binary,
261            SemanticType::Field,
262            Some(ColumnDataTypeExtension {
263                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
264            }),
265            None,
266        ),
267        (
268            "scope_schema_url",
269            ColumnDataType::String,
270            SemanticType::Field,
271            None,
272            None,
273        ),
274        (
275            "resource_attributes",
276            ColumnDataType::Binary,
277            SemanticType::Field,
278            Some(ColumnDataTypeExtension {
279                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
280            }),
281            None,
282        ),
283        (
284            "resource_schema_url",
285            ColumnDataType::String,
286            SemanticType::Field,
287            None,
288            None,
289        ),
290    ]
291    .into_iter()
292    .map(
293        |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
294            column_name: field_name.to_string(),
295            datatype: column_type as i32,
296            semantic_type: semantic_type as i32,
297            datatype_extension,
298            options,
299        },
300    )
301    .collect::<Vec<ColumnSchema>>()
302}
303
304fn build_otlp_build_in_row(
305    log: LogRecord,
306    parse_ctx: &mut ParseContext,
307) -> (Row, JsonbValue<'static>) {
308    let log_attr = key_value_to_jsonb(log.attributes);
309    let ts = if log.time_unix_nano != 0 {
310        log.time_unix_nano
311    } else {
312        log.observed_time_unix_nano
313    };
314
315    let row = vec![
316        GreptimeValue {
317            value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
318        },
319        GreptimeValue {
320            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
321        },
322        GreptimeValue {
323            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
324        },
325        GreptimeValue {
326            value_data: Some(ValueData::StringValue(log.severity_text)),
327        },
328        GreptimeValue {
329            value_data: Some(ValueData::I32Value(log.severity_number)),
330        },
331        GreptimeValue {
332            value_data: log
333                .body
334                .as_ref()
335                .map(|x| ValueData::StringValue(log_body_to_string(x))),
336        },
337        GreptimeValue {
338            value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
339        },
340        GreptimeValue {
341            value_data: Some(ValueData::U32Value(log.flags)),
342        },
343        GreptimeValue {
344            value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
345        },
346        GreptimeValue {
347            value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
348        },
349        GreptimeValue {
350            value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
351        },
352        GreptimeValue {
353            value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
354        },
355        GreptimeValue {
356            value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
357        },
358        GreptimeValue {
359            value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
360        },
361    ];
362    (Row { values: row }, log_attr)
363}
364
365fn extract_field_from_attr_and_combine_schema(
366    select_info: &SelectInfo,
367    select_schema: &mut SchemaInfo,
368    attrs: &jsonb::Value,
369) -> Result<Vec<GreptimeValue>> {
370    // note we use schema.len instead of select_keys.len
371    // because the len of the row value should always matches the len of the schema
372    let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
373
374    for key in select_info.keys.iter() {
375        let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
376            continue;
377        };
378        let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else {
379            continue;
380        };
381
382        if let Some(index) = select_schema.index.get(key) {
383            let column_schema = &select_schema.schema[*index];
384            let column_schema: ColumnSchema = column_schema.clone().try_into()?;
385            // datatype of the same column name should be the same
386            ensure!(
387                column_schema.datatype == schema.datatype,
388                IncompatibleSchemaSnafu {
389                    column_name: key,
390                    datatype: column_schema.datatype().as_str_name(),
391                    expected: column_schema.datatype,
392                    actual: schema.datatype,
393                }
394            );
395            extracted_values[*index] = value;
396        } else {
397            select_schema.schema.push(schema.into());
398            select_schema
399                .index
400                .insert(key.clone(), select_schema.schema.len() - 1);
401            extracted_values.push(value);
402        }
403    }
404
405    Ok(extracted_values)
406}
407
408fn decide_column_schema_and_convert_value(
409    column_name: &str,
410    value: JsonbValue,
411) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
412    let column_info = match value {
413        JsonbValue::String(s) => Ok(Some((
414            GreptimeValue {
415                value_data: Some(ValueData::StringValue(s.into())),
416            },
417            ColumnDataType::String,
418            SemanticType::Tag,
419            None,
420        ))),
421        JsonbValue::Number(n) => match n {
422            JsonbNumber::Int64(i) => Ok(Some((
423                GreptimeValue {
424                    value_data: Some(ValueData::I64Value(i)),
425                },
426                ColumnDataType::Int64,
427                SemanticType::Tag,
428                None,
429            ))),
430            JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
431                ty: "FLOAT".to_string(),
432                key: column_name,
433            }
434            .fail(),
435            JsonbNumber::UInt64(u) => Ok(Some((
436                GreptimeValue {
437                    value_data: Some(ValueData::U64Value(u)),
438                },
439                ColumnDataType::Uint64,
440                SemanticType::Tag,
441                None,
442            ))),
443        },
444        JsonbValue::Bool(b) => Ok(Some((
445            GreptimeValue {
446                value_data: Some(ValueData::BoolValue(b)),
447            },
448            ColumnDataType::Boolean,
449            SemanticType::Tag,
450            None,
451        ))),
452        JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
453            ty: "Json".to_string(),
454            key: column_name,
455        }
456        .fail(),
457        JsonbValue::Null => Ok(None),
458    };
459    column_info.map(|c| {
460        c.map(|(value, column_type, semantic_type, datatype_extension)| {
461            (
462                ColumnSchema {
463                    column_name: column_name.to_string(),
464                    datatype: column_type as i32,
465                    semantic_type: semantic_type as i32,
466                    datatype_extension,
467                    options: None,
468                },
469                value,
470            )
471        })
472    })
473}
474
475fn parse_export_logs_service_request_to_rows(
476    request: ExportLogsServiceRequest,
477    select_info: Box<SelectInfo>,
478) -> Result<Rows> {
479    let mut schemas = build_otlp_logs_identity_schema();
480
481    let mut parse_ctx = ParseContext::new(select_info);
482    let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
483
484    schemas.extend(parse_ctx.select_schema.column_schemas()?);
485
486    rows.iter_mut().for_each(|row| {
487        row.values.resize(schemas.len(), GreptimeValue::default());
488    });
489
490    Ok(Rows {
491        schema: schemas,
492        rows,
493    })
494}
495
496fn parse_resource(
497    parse_ctx: &mut ParseContext,
498    resource_logs_vec: Vec<ResourceLogs>,
499) -> Result<Vec<Row>> {
500    let total_len = resource_logs_vec
501        .iter()
502        .flat_map(|r| r.scope_logs.iter())
503        .map(|s| s.log_records.len())
504        .sum();
505
506    let mut results = Vec::with_capacity(total_len);
507
508    for r in resource_logs_vec {
509        parse_ctx.resource_attr = r
510            .resource
511            .map(|resource| key_value_to_jsonb(resource.attributes))
512            .unwrap_or(JsonbValue::Null);
513
514        parse_ctx.resource_url = r.schema_url;
515
516        parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
517            &parse_ctx.select_info,
518            &mut parse_ctx.select_schema,
519            &parse_ctx.resource_attr,
520        )?;
521
522        let rows = parse_scope(r.scope_logs, parse_ctx)?;
523        results.extend(rows);
524    }
525    Ok(results)
526}
527
528struct ParseContext<'a> {
529    // input selected keys
530    select_info: Box<SelectInfo>,
531    // schema infos for selected keys from resource/scope/log for current request
532    // since the value override from bottom to top, the max capacity is the length of the keys
533    select_schema: SchemaInfo,
534
535    // extracted and uplifted values using select keys
536    resource_uplift_values: Vec<GreptimeValue>,
537    scope_uplift_values: Vec<GreptimeValue>,
538
539    // passdown values
540    resource_url: String,
541    resource_attr: JsonbValue<'a>,
542    scope_name: Option<String>,
543    scope_version: Option<String>,
544    scope_url: String,
545    scope_attrs: JsonbValue<'a>,
546}
547
548impl<'a> ParseContext<'a> {
549    pub fn new(select_info: Box<SelectInfo>) -> ParseContext<'a> {
550        let len = select_info.keys.len();
551        ParseContext {
552            select_info,
553            select_schema: SchemaInfo::with_capacity(len),
554            resource_uplift_values: vec![],
555            scope_uplift_values: vec![],
556            resource_url: String::new(),
557            resource_attr: JsonbValue::Null,
558            scope_name: None,
559            scope_version: None,
560            scope_url: String::new(),
561            scope_attrs: JsonbValue::Null,
562        }
563    }
564}
565
566fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
567    let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
568    let mut results = Vec::with_capacity(len);
569
570    for scope_logs in scopes_log_vec {
571        let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
572        parse_ctx.scope_name = scope_name;
573        parse_ctx.scope_version = scope_version;
574        parse_ctx.scope_url = scope_logs.schema_url;
575        parse_ctx.scope_attrs = scope_attrs;
576
577        parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
578            &parse_ctx.select_info,
579            &mut parse_ctx.select_schema,
580            &parse_ctx.scope_attrs,
581        )?;
582
583        let rows = parse_log(scope_logs.log_records, parse_ctx)?;
584        results.extend(rows);
585    }
586    Ok(results)
587}
588
589fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
590    let mut result = Vec::with_capacity(log_records.len());
591
592    for log in log_records {
593        let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
594
595        let log_values = extract_field_from_attr_and_combine_schema(
596            &parse_ctx.select_info,
597            &mut parse_ctx.select_schema,
598            &log_attr,
599        )?;
600
601        let extracted_values = merge_values(
602            log_values,
603            &parse_ctx.scope_uplift_values,
604            &parse_ctx.resource_uplift_values,
605        );
606
607        row.values.extend(extracted_values);
608
609        result.push(row);
610    }
611    Ok(result)
612}
613
614fn merge_values(
615    log: Vec<GreptimeValue>,
616    scope: &[GreptimeValue],
617    resource: &[GreptimeValue],
618) -> Vec<GreptimeValue> {
619    log.into_iter()
620        .enumerate()
621        .map(|(i, value)| GreptimeValue {
622            value_data: value
623                .value_data
624                .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
625                .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
626        })
627        .collect()
628}
629
630/// transform otlp logs request to pipeline value
631/// https://opentelemetry.io/docs/concepts/signals/logs/
632fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<VrlValue> {
633    let mut result = Vec::new();
634    for r in request.resource_logs {
635        let resource_attr = r
636            .resource
637            .map(|x| VrlValue::Object(key_value_to_map(x.attributes)))
638            .unwrap_or(VrlValue::Null);
639        let resource_schema_url = VrlValue::Bytes(r.schema_url.into());
640        for scope_logs in r.scope_logs {
641            let (scope_attrs, scope_version, scope_name) =
642                scope_to_pipeline_value(scope_logs.scope);
643            let scope_schema_url = VrlValue::Bytes(scope_logs.schema_url.into());
644            for log in scope_logs.log_records {
645                let value = log_to_pipeline_value(
646                    log,
647                    resource_schema_url.clone(),
648                    resource_attr.clone(),
649                    scope_schema_url.clone(),
650                    scope_name.clone(),
651                    scope_version.clone(),
652                    scope_attrs.clone(),
653                );
654                result.push(value);
655            }
656        }
657    }
658    result
659}
660
661// convert AnyValue to pipeline value
662fn any_value_to_vrl_value(value: any_value::Value) -> VrlValue {
663    match value {
664        any_value::Value::StringValue(s) => VrlValue::Bytes(s.into()),
665        any_value::Value::IntValue(i) => VrlValue::Integer(i),
666        any_value::Value::DoubleValue(d) => VrlValue::Float(NotNan::new(d).unwrap()),
667        any_value::Value::BoolValue(b) => VrlValue::Boolean(b),
668        any_value::Value::ArrayValue(array_value) => {
669            let values = array_value
670                .values
671                .into_iter()
672                .filter_map(|v| v.value.map(any_value_to_vrl_value))
673                .collect();
674            VrlValue::Array(values)
675        }
676        any_value::Value::KvlistValue(key_value_list) => {
677            VrlValue::Object(key_value_to_map(key_value_list.values))
678        }
679        any_value::Value::BytesValue(items) => VrlValue::Bytes(Bytes::from(items)),
680    }
681}
682
683// convert otlp keyValue vec to map
684fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<KeyString, VrlValue> {
685    let mut map = BTreeMap::new();
686    for kv in key_values {
687        let value = match kv.value {
688            Some(value) => match value.value {
689                Some(value) => any_value_to_vrl_value(value),
690                None => VrlValue::Null,
691            },
692            None => VrlValue::Null,
693        };
694        map.insert(kv.key.into(), value);
695    }
696    map
697}
698
699fn log_body_to_string(body: &AnyValue) -> String {
700    let otlp_value = OtlpAnyValue::from(body);
701    otlp_value.to_string()
702}