servers/otlp/
logs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap as StdHashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
21    RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
22};
23use jsonb::{Number as JsonbNumber, Value as JsonbValue};
24use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
25use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue};
26use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
27use pipeline::{
28    ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
29};
30use serde_json::{Map, Value};
31use session::context::QueryContextRef;
32use snafu::{ensure, ResultExt};
33
34use crate::error::{
35    IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result,
36    UnsupportedJsonDataTypeForTagSnafu,
37};
38use crate::http::event::PipelineIngestRequest;
39use crate::otlp::trace::attributes::OtlpAnyValue;
40use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
41use crate::pipeline::run_pipeline;
42use crate::query_handler::PipelineHandlerRef;
43
44pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
45
46/// Convert OpenTelemetry metrics to GreptimeDB insert requests
47///
48/// See
49/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
50/// for data structure of OTLP metrics.
51///
52/// Returns `InsertRequests` and total number of rows to ingest
53pub async fn to_grpc_insert_requests(
54    request: ExportLogsServiceRequest,
55    pipeline: PipelineWay,
56    pipeline_params: GreptimePipelineParams,
57    table_name: String,
58    query_ctx: &QueryContextRef,
59    pipeline_handler: PipelineHandlerRef,
60) -> Result<ContextReq> {
61    match pipeline {
62        PipelineWay::OtlpLogDirect(select_info) => {
63            let rows = parse_export_logs_service_request_to_rows(request, select_info)?;
64            let insert_request = RowInsertRequest {
65                rows: Some(rows),
66                table_name,
67            };
68
69            Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
70        }
71        PipelineWay::Pipeline(pipeline_def) => {
72            let data = parse_export_logs_service_request(request);
73            let array = pipeline::json_array_to_map(data).context(PipelineSnafu)?;
74
75            let pipeline_ctx =
76                PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
77            run_pipeline(
78                &pipeline_handler,
79                &pipeline_ctx,
80                PipelineIngestRequest {
81                    table: table_name,
82                    values: array,
83                },
84                query_ctx,
85                true,
86            )
87            .await
88        }
89        _ => NotSupportedSnafu {
90            feat: "Unsupported pipeline for logs",
91        }
92        .fail(),
93    }
94}
95
96fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (Value, Value, Value) {
97    scope
98        .map(|x| {
99            (
100                Value::Object(key_value_to_map(x.attributes)),
101                Value::String(x.version),
102                Value::String(x.name),
103            )
104        })
105        .unwrap_or((Value::Null, Value::Null, Value::Null))
106}
107
108fn scope_to_jsonb(
109    scope: Option<InstrumentationScope>,
110) -> (JsonbValue<'static>, Option<String>, Option<String>) {
111    scope
112        .map(|x| {
113            (
114                key_value_to_jsonb(x.attributes),
115                Some(x.version),
116                Some(x.name),
117            )
118        })
119        .unwrap_or((JsonbValue::Null, None, None))
120}
121
122fn log_to_pipeline_value(
123    log: LogRecord,
124    resource_schema_url: Value,
125    resource_attr: Value,
126    scope_schema_url: Value,
127    scope_name: Value,
128    scope_version: Value,
129    scope_attrs: Value,
130) -> Value {
131    let log_attrs = Value::Object(key_value_to_map(log.attributes));
132    let mut map = Map::new();
133    map.insert("Timestamp".to_string(), Value::from(log.time_unix_nano));
134    map.insert(
135        "ObservedTimestamp".to_string(),
136        Value::from(log.observed_time_unix_nano),
137    );
138
139    // need to be convert to string
140    map.insert(
141        "TraceId".to_string(),
142        Value::String(bytes_to_hex_string(&log.trace_id)),
143    );
144    map.insert(
145        "SpanId".to_string(),
146        Value::String(bytes_to_hex_string(&log.span_id)),
147    );
148    map.insert("TraceFlags".to_string(), Value::from(log.flags));
149    map.insert("SeverityText".to_string(), Value::String(log.severity_text));
150    map.insert(
151        "SeverityNumber".to_string(),
152        Value::from(log.severity_number),
153    );
154    // need to be convert to string
155    map.insert(
156        "Body".to_string(),
157        log.body
158            .as_ref()
159            .map(|x| Value::String(log_body_to_string(x)))
160            .unwrap_or(Value::Null),
161    );
162    map.insert("ResourceSchemaUrl".to_string(), resource_schema_url);
163
164    map.insert("ResourceAttributes".to_string(), resource_attr);
165    map.insert("ScopeSchemaUrl".to_string(), scope_schema_url);
166    map.insert("ScopeName".to_string(), scope_name);
167    map.insert("ScopeVersion".to_string(), scope_version);
168    map.insert("ScopeAttributes".to_string(), scope_attrs);
169    map.insert("LogAttributes".to_string(), log_attrs);
170    Value::Object(map)
171}
172
173fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
174    [
175        (
176            "timestamp",
177            ColumnDataType::TimestampNanosecond,
178            SemanticType::Timestamp,
179            None,
180            None,
181        ),
182        (
183            "trace_id",
184            ColumnDataType::String,
185            SemanticType::Field,
186            None,
187            None,
188        ),
189        (
190            "span_id",
191            ColumnDataType::String,
192            SemanticType::Field,
193            None,
194            None,
195        ),
196        (
197            "severity_text",
198            ColumnDataType::String,
199            SemanticType::Field,
200            None,
201            None,
202        ),
203        (
204            "severity_number",
205            ColumnDataType::Int32,
206            SemanticType::Field,
207            None,
208            None,
209        ),
210        (
211            "body",
212            ColumnDataType::String,
213            SemanticType::Field,
214            None,
215            Some(ColumnOptions {
216                options: StdHashMap::from([(
217                    "fulltext".to_string(),
218                    r#"{"enable":true}"#.to_string(),
219                )]),
220            }),
221        ),
222        (
223            "log_attributes",
224            ColumnDataType::Binary,
225            SemanticType::Field,
226            Some(ColumnDataTypeExtension {
227                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
228            }),
229            None,
230        ),
231        (
232            "trace_flags",
233            ColumnDataType::Uint32,
234            SemanticType::Field,
235            None,
236            None,
237        ),
238        (
239            "scope_name",
240            ColumnDataType::String,
241            SemanticType::Tag,
242            None,
243            None,
244        ),
245        (
246            "scope_version",
247            ColumnDataType::String,
248            SemanticType::Field,
249            None,
250            None,
251        ),
252        (
253            "scope_attributes",
254            ColumnDataType::Binary,
255            SemanticType::Field,
256            Some(ColumnDataTypeExtension {
257                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
258            }),
259            None,
260        ),
261        (
262            "scope_schema_url",
263            ColumnDataType::String,
264            SemanticType::Field,
265            None,
266            None,
267        ),
268        (
269            "resource_attributes",
270            ColumnDataType::Binary,
271            SemanticType::Field,
272            Some(ColumnDataTypeExtension {
273                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
274            }),
275            None,
276        ),
277        (
278            "resource_schema_url",
279            ColumnDataType::String,
280            SemanticType::Field,
281            None,
282            None,
283        ),
284    ]
285    .into_iter()
286    .map(
287        |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
288            column_name: field_name.to_string(),
289            datatype: column_type as i32,
290            semantic_type: semantic_type as i32,
291            datatype_extension,
292            options,
293        },
294    )
295    .collect::<Vec<ColumnSchema>>()
296}
297
298fn build_otlp_build_in_row(
299    log: LogRecord,
300    parse_ctx: &mut ParseContext,
301) -> (Row, JsonbValue<'static>) {
302    let log_attr = key_value_to_jsonb(log.attributes);
303    let ts = if log.time_unix_nano != 0 {
304        log.time_unix_nano
305    } else {
306        log.observed_time_unix_nano
307    };
308
309    let row = vec![
310        GreptimeValue {
311            value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
312        },
313        GreptimeValue {
314            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
315        },
316        GreptimeValue {
317            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
318        },
319        GreptimeValue {
320            value_data: Some(ValueData::StringValue(log.severity_text)),
321        },
322        GreptimeValue {
323            value_data: Some(ValueData::I32Value(log.severity_number)),
324        },
325        GreptimeValue {
326            value_data: log
327                .body
328                .as_ref()
329                .map(|x| ValueData::StringValue(log_body_to_string(x))),
330        },
331        GreptimeValue {
332            value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
333        },
334        GreptimeValue {
335            value_data: Some(ValueData::U32Value(log.flags)),
336        },
337        GreptimeValue {
338            value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
339        },
340        GreptimeValue {
341            value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
342        },
343        GreptimeValue {
344            value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
345        },
346        GreptimeValue {
347            value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
348        },
349        GreptimeValue {
350            value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
351        },
352        GreptimeValue {
353            value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
354        },
355    ];
356    (Row { values: row }, log_attr)
357}
358
359fn extract_field_from_attr_and_combine_schema(
360    select_info: &SelectInfo,
361    select_schema: &mut SchemaInfo,
362    attrs: &jsonb::Value,
363) -> Result<Vec<GreptimeValue>> {
364    // note we use schema.len instead of select_keys.len
365    // because the len of the row value should always matches the len of the schema
366    let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
367
368    for key in select_info.keys.iter() {
369        let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
370            continue;
371        };
372        let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else {
373            continue;
374        };
375
376        if let Some(index) = select_schema.index.get(key) {
377            let column_schema = &select_schema.schema[*index];
378            // datatype of the same column name should be the same
379            ensure!(
380                column_schema.datatype == schema.datatype,
381                IncompatibleSchemaSnafu {
382                    column_name: key,
383                    datatype: column_schema.datatype().as_str_name(),
384                    expected: column_schema.datatype,
385                    actual: schema.datatype,
386                }
387            );
388            extracted_values[*index] = value;
389        } else {
390            select_schema.schema.push(schema);
391            select_schema
392                .index
393                .insert(key.clone(), select_schema.schema.len() - 1);
394            extracted_values.push(value);
395        }
396    }
397
398    Ok(extracted_values)
399}
400
401fn decide_column_schema_and_convert_value(
402    column_name: &str,
403    value: JsonbValue,
404) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
405    let column_info = match value {
406        JsonbValue::String(s) => Ok(Some((
407            GreptimeValue {
408                value_data: Some(ValueData::StringValue(s.into())),
409            },
410            ColumnDataType::String,
411            SemanticType::Tag,
412            None,
413        ))),
414        JsonbValue::Number(n) => match n {
415            JsonbNumber::Int64(i) => Ok(Some((
416                GreptimeValue {
417                    value_data: Some(ValueData::I64Value(i)),
418                },
419                ColumnDataType::Int64,
420                SemanticType::Tag,
421                None,
422            ))),
423            JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
424                ty: "FLOAT".to_string(),
425                key: column_name,
426            }
427            .fail(),
428            JsonbNumber::UInt64(u) => Ok(Some((
429                GreptimeValue {
430                    value_data: Some(ValueData::U64Value(u)),
431                },
432                ColumnDataType::Uint64,
433                SemanticType::Tag,
434                None,
435            ))),
436        },
437        JsonbValue::Bool(b) => Ok(Some((
438            GreptimeValue {
439                value_data: Some(ValueData::BoolValue(b)),
440            },
441            ColumnDataType::Boolean,
442            SemanticType::Tag,
443            None,
444        ))),
445        JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
446            ty: "Json".to_string(),
447            key: column_name,
448        }
449        .fail(),
450        JsonbValue::Null => Ok(None),
451    };
452    column_info.map(|c| {
453        c.map(|(value, column_type, semantic_type, datatype_extension)| {
454            (
455                ColumnSchema {
456                    column_name: column_name.to_string(),
457                    datatype: column_type as i32,
458                    semantic_type: semantic_type as i32,
459                    datatype_extension,
460                    options: None,
461                },
462                value,
463            )
464        })
465    })
466}
467
468fn parse_export_logs_service_request_to_rows(
469    request: ExportLogsServiceRequest,
470    select_info: Box<SelectInfo>,
471) -> Result<Rows> {
472    let mut schemas = build_otlp_logs_identity_schema();
473
474    let mut parse_ctx = ParseContext::new(select_info);
475    let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
476
477    schemas.extend(parse_ctx.select_schema.schema);
478
479    rows.iter_mut().for_each(|row| {
480        row.values.resize(schemas.len(), GreptimeValue::default());
481    });
482
483    Ok(Rows {
484        schema: schemas,
485        rows,
486    })
487}
488
489fn parse_resource(
490    parse_ctx: &mut ParseContext,
491    resource_logs_vec: Vec<ResourceLogs>,
492) -> Result<Vec<Row>> {
493    let total_len = resource_logs_vec
494        .iter()
495        .flat_map(|r| r.scope_logs.iter())
496        .map(|s| s.log_records.len())
497        .sum();
498
499    let mut results = Vec::with_capacity(total_len);
500
501    for r in resource_logs_vec {
502        parse_ctx.resource_attr = r
503            .resource
504            .map(|resource| key_value_to_jsonb(resource.attributes))
505            .unwrap_or(JsonbValue::Null);
506
507        parse_ctx.resource_url = r.schema_url;
508
509        parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
510            &parse_ctx.select_info,
511            &mut parse_ctx.select_schema,
512            &parse_ctx.resource_attr,
513        )?;
514
515        let rows = parse_scope(r.scope_logs, parse_ctx)?;
516        results.extend(rows);
517    }
518    Ok(results)
519}
520
521struct ParseContext<'a> {
522    // input selected keys
523    select_info: Box<SelectInfo>,
524    // schema infos for selected keys from resource/scope/log for current request
525    // since the value override from bottom to top, the max capacity is the length of the keys
526    select_schema: SchemaInfo,
527
528    // extracted and uplifted values using select keys
529    resource_uplift_values: Vec<GreptimeValue>,
530    scope_uplift_values: Vec<GreptimeValue>,
531
532    // passdown values
533    resource_url: String,
534    resource_attr: JsonbValue<'a>,
535    scope_name: Option<String>,
536    scope_version: Option<String>,
537    scope_url: String,
538    scope_attrs: JsonbValue<'a>,
539}
540
541impl<'a> ParseContext<'a> {
542    pub fn new(select_info: Box<SelectInfo>) -> ParseContext<'a> {
543        let len = select_info.keys.len();
544        ParseContext {
545            select_info,
546            select_schema: SchemaInfo::with_capacity(len),
547            resource_uplift_values: vec![],
548            scope_uplift_values: vec![],
549            resource_url: String::new(),
550            resource_attr: JsonbValue::Null,
551            scope_name: None,
552            scope_version: None,
553            scope_url: String::new(),
554            scope_attrs: JsonbValue::Null,
555        }
556    }
557}
558
559fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
560    let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
561    let mut results = Vec::with_capacity(len);
562
563    for scope_logs in scopes_log_vec {
564        let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
565        parse_ctx.scope_name = scope_name;
566        parse_ctx.scope_version = scope_version;
567        parse_ctx.scope_url = scope_logs.schema_url;
568        parse_ctx.scope_attrs = scope_attrs;
569
570        parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
571            &parse_ctx.select_info,
572            &mut parse_ctx.select_schema,
573            &parse_ctx.scope_attrs,
574        )?;
575
576        let rows = parse_log(scope_logs.log_records, parse_ctx)?;
577        results.extend(rows);
578    }
579    Ok(results)
580}
581
582fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
583    let mut result = Vec::with_capacity(log_records.len());
584
585    for log in log_records {
586        let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
587
588        let log_values = extract_field_from_attr_and_combine_schema(
589            &parse_ctx.select_info,
590            &mut parse_ctx.select_schema,
591            &log_attr,
592        )?;
593
594        let extracted_values = merge_values(
595            log_values,
596            &parse_ctx.scope_uplift_values,
597            &parse_ctx.resource_uplift_values,
598        );
599
600        row.values.extend(extracted_values);
601
602        result.push(row);
603    }
604    Ok(result)
605}
606
607fn merge_values(
608    log: Vec<GreptimeValue>,
609    scope: &[GreptimeValue],
610    resource: &[GreptimeValue],
611) -> Vec<GreptimeValue> {
612    log.into_iter()
613        .enumerate()
614        .map(|(i, value)| GreptimeValue {
615            value_data: value
616                .value_data
617                .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
618                .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
619        })
620        .collect()
621}
622
623/// transform otlp logs request to pipeline value
624/// https://opentelemetry.io/docs/concepts/signals/logs/
625fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<Value> {
626    let mut result = Vec::new();
627    for r in request.resource_logs {
628        let resource_attr = r
629            .resource
630            .map(|x| Value::Object(key_value_to_map(x.attributes)))
631            .unwrap_or(Value::Null);
632        let resource_schema_url = Value::String(r.schema_url);
633        for scope_logs in r.scope_logs {
634            let (scope_attrs, scope_version, scope_name) =
635                scope_to_pipeline_value(scope_logs.scope);
636            let scope_schema_url = Value::String(scope_logs.schema_url);
637            for log in scope_logs.log_records {
638                let value = log_to_pipeline_value(
639                    log,
640                    resource_schema_url.clone(),
641                    resource_attr.clone(),
642                    scope_schema_url.clone(),
643                    scope_name.clone(),
644                    scope_version.clone(),
645                    scope_attrs.clone(),
646                );
647                result.push(value);
648            }
649        }
650    }
651    result
652}
653
654// convert AnyValue to pipeline value
655fn any_value_to_pipeline_value(value: any_value::Value) -> Value {
656    match value {
657        any_value::Value::StringValue(s) => Value::String(s),
658        any_value::Value::IntValue(i) => Value::from(i),
659        any_value::Value::DoubleValue(d) => Value::from(d),
660        any_value::Value::BoolValue(b) => Value::Bool(b),
661        any_value::Value::ArrayValue(a) => {
662            let values = a
663                .values
664                .into_iter()
665                .map(|v| match v.value {
666                    Some(value) => any_value_to_pipeline_value(value),
667                    None => Value::Null,
668                })
669                .collect();
670            Value::Array(values)
671        }
672        any_value::Value::KvlistValue(kv) => {
673            let value = key_value_to_map(kv.values);
674            Value::Object(value)
675        }
676        any_value::Value::BytesValue(b) => Value::String(bytes_to_hex_string(&b)),
677    }
678}
679
680// convert otlp keyValue vec to map
681fn key_value_to_map(key_values: Vec<KeyValue>) -> Map<String, Value> {
682    let mut map = Map::new();
683    for kv in key_values {
684        let value = match kv.value {
685            Some(value) => match value.value {
686                Some(value) => any_value_to_pipeline_value(value),
687                None => Value::Null,
688            },
689            None => Value::Null,
690        };
691        map.insert(kv.key.clone(), value);
692    }
693    map
694}
695
696fn log_body_to_string(body: &AnyValue) -> String {
697    let otlp_value = OtlpAnyValue::from(body);
698    otlp_value.to_string()
699}