Skip to main content

servers/otlp/
logs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use ahash::{HashMap, HashMapExt};
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::column_data_type_extension::TypeExt;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::value::ValueData;
22use api::v1::{
23    ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
24    RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
25};
26use bytes::Bytes;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use jsonb::{Number as JsonbNumber, Value as JsonbValue};
30use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
31use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value};
32use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
33use pipeline::{
34    ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
35};
36use session::context::QueryContextRef;
37use snafu::ensure;
38use vrl::prelude::NotNan;
39use vrl::value::{KeyString, Value as VrlValue};
40
41use crate::error::{
42    Error, IncompatibleSchemaSnafu, InvalidParameterSnafu, NotSupportedSnafu, Result,
43    UnsupportedJsonDataTypeForTagSnafu,
44};
45use crate::http::event::PipelineIngestRequest;
46use crate::otlp::coerce::coerce_value_data;
47use crate::otlp::trace::attributes::OtlpAnyValue;
48use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
49use crate::pipeline::run_pipeline;
50use crate::query_handler::PipelineHandlerRef;
51
52pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
53
54/// Convert OpenTelemetry metrics to GreptimeDB insert requests
55///
56/// See
57/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
58/// for data structure of OTLP metrics.
59///
60/// Returns `InsertRequests` and total number of rows to ingest
61pub async fn to_grpc_insert_requests(
62    request: ExportLogsServiceRequest,
63    pipeline: PipelineWay,
64    pipeline_params: GreptimePipelineParams,
65    table_name: String,
66    query_ctx: &QueryContextRef,
67    pipeline_handler: PipelineHandlerRef,
68) -> Result<ContextReq> {
69    match pipeline {
70        PipelineWay::OtlpLogDirect(select_info) => {
71            let table = pipeline_handler
72                .get_table(&table_name, query_ctx)
73                .await
74                .map_err(Error::from)?;
75            let existing_schema = table
76                .as_deref()
77                .map(ExistingLogSchema::try_from_table)
78                .transpose()?;
79            let rows = parse_export_logs_service_request_to_rows(
80                request,
81                select_info,
82                existing_schema.as_ref(),
83                &table_name,
84            )?;
85            let insert_request = RowInsertRequest {
86                rows: Some(rows),
87                table_name,
88            };
89
90            Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
91        }
92        PipelineWay::Pipeline(pipeline_def) => {
93            let array = parse_export_logs_service_request(request);
94
95            let pipeline_ctx =
96                PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
97            run_pipeline(
98                &pipeline_handler,
99                &pipeline_ctx,
100                PipelineIngestRequest {
101                    table: table_name,
102                    values: array,
103                },
104                query_ctx,
105                true,
106            )
107            .await
108        }
109        _ => NotSupportedSnafu {
110            feat: "Unsupported pipeline for logs",
111        }
112        .fail(),
113    }
114}
115
116fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (VrlValue, VrlValue, VrlValue) {
117    scope
118        .map(|x| {
119            (
120                VrlValue::Object(key_value_to_map(x.attributes)),
121                VrlValue::Bytes(x.version.into()),
122                VrlValue::Bytes(x.name.into()),
123            )
124        })
125        .unwrap_or((VrlValue::Null, VrlValue::Null, VrlValue::Null))
126}
127
128fn scope_to_jsonb(
129    scope: Option<InstrumentationScope>,
130) -> (JsonbValue<'static>, Option<String>, Option<String>) {
131    scope
132        .map(|x| {
133            (
134                key_value_to_jsonb(x.attributes),
135                Some(x.version),
136                Some(x.name),
137            )
138        })
139        .unwrap_or((JsonbValue::Null, None, None))
140}
141
142fn log_to_pipeline_value(
143    log: LogRecord,
144    resource_schema_url: VrlValue,
145    resource_attr: VrlValue,
146    scope_schema_url: VrlValue,
147    scope_name: VrlValue,
148    scope_version: VrlValue,
149    scope_attrs: VrlValue,
150) -> VrlValue {
151    let log_attrs = VrlValue::Object(key_value_to_map(log.attributes));
152    let mut map = BTreeMap::new();
153    map.insert(
154        "Timestamp".into(),
155        VrlValue::Integer(log.time_unix_nano as i64),
156    );
157    map.insert(
158        "ObservedTimestamp".into(),
159        VrlValue::Integer(log.observed_time_unix_nano as i64),
160    );
161
162    // need to be convert to string
163    map.insert(
164        "TraceId".into(),
165        VrlValue::Bytes(bytes_to_hex_string(&log.trace_id).into()),
166    );
167    map.insert(
168        "SpanId".into(),
169        VrlValue::Bytes(bytes_to_hex_string(&log.span_id).into()),
170    );
171    map.insert("TraceFlags".into(), VrlValue::Integer(log.flags as i64));
172    map.insert(
173        "SeverityText".into(),
174        VrlValue::Bytes(log.severity_text.into()),
175    );
176    map.insert(
177        "SeverityNumber".into(),
178        VrlValue::Integer(log.severity_number as i64),
179    );
180    // need to be convert to string
181    map.insert(
182        "Body".into(),
183        log.body
184            .as_ref()
185            .map(|x| VrlValue::Bytes(log_body_to_string(x).into()))
186            .unwrap_or(VrlValue::Null),
187    );
188    map.insert("ResourceSchemaUrl".into(), resource_schema_url);
189
190    map.insert("ResourceAttributes".into(), resource_attr);
191    map.insert("ScopeSchemaUrl".into(), scope_schema_url);
192    map.insert("ScopeName".into(), scope_name);
193    map.insert("ScopeVersion".into(), scope_version);
194    map.insert("ScopeAttributes".into(), scope_attrs);
195    map.insert("LogAttributes".into(), log_attrs);
196    VrlValue::Object(map)
197}
198
199fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
200    [
201        (
202            "timestamp",
203            ColumnDataType::TimestampNanosecond,
204            SemanticType::Timestamp,
205            None,
206            None,
207        ),
208        (
209            "trace_id",
210            ColumnDataType::String,
211            SemanticType::Field,
212            None,
213            None,
214        ),
215        (
216            "span_id",
217            ColumnDataType::String,
218            SemanticType::Field,
219            None,
220            None,
221        ),
222        (
223            "severity_text",
224            ColumnDataType::String,
225            SemanticType::Field,
226            None,
227            None,
228        ),
229        (
230            "severity_number",
231            ColumnDataType::Int32,
232            SemanticType::Field,
233            None,
234            None,
235        ),
236        (
237            "body",
238            ColumnDataType::String,
239            SemanticType::Field,
240            None,
241            Some(ColumnOptions {
242                options: std::collections::HashMap::from([(
243                    "fulltext".to_string(),
244                    r#"{"enable":true}"#.to_string(),
245                )]),
246            }),
247        ),
248        (
249            "log_attributes",
250            ColumnDataType::Binary,
251            SemanticType::Field,
252            Some(ColumnDataTypeExtension {
253                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
254            }),
255            None,
256        ),
257        (
258            "trace_flags",
259            ColumnDataType::Uint32,
260            SemanticType::Field,
261            None,
262            None,
263        ),
264        (
265            "scope_name",
266            ColumnDataType::String,
267            SemanticType::Tag,
268            None,
269            None,
270        ),
271        (
272            "scope_version",
273            ColumnDataType::String,
274            SemanticType::Field,
275            None,
276            None,
277        ),
278        (
279            "scope_attributes",
280            ColumnDataType::Binary,
281            SemanticType::Field,
282            Some(ColumnDataTypeExtension {
283                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
284            }),
285            None,
286        ),
287        (
288            "scope_schema_url",
289            ColumnDataType::String,
290            SemanticType::Field,
291            None,
292            None,
293        ),
294        (
295            "resource_attributes",
296            ColumnDataType::Binary,
297            SemanticType::Field,
298            Some(ColumnDataTypeExtension {
299                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
300            }),
301            None,
302        ),
303        (
304            "resource_schema_url",
305            ColumnDataType::String,
306            SemanticType::Field,
307            None,
308            None,
309        ),
310    ]
311    .into_iter()
312    .map(
313        |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
314            column_name: field_name.to_string(),
315            datatype: column_type as i32,
316            semantic_type: semantic_type as i32,
317            datatype_extension,
318            options,
319        },
320    )
321    .collect::<Vec<ColumnSchema>>()
322}
323
324#[derive(Clone)]
325struct ExistingLogColumn {
326    schema: ColumnSchema,
327    datatype: ColumnDataType,
328}
329
330impl ExistingLogColumn {
331    fn schema_for_request_type(&self, request_type: ColumnDataType) -> ColumnSchema {
332        let mut schema = self.schema.clone();
333        if request_type == ColumnDataType::Binary && self.is_json_binary() {
334            schema.datatype = ColumnDataType::Binary as i32;
335        }
336        schema
337    }
338
339    fn is_json_binary(&self) -> bool {
340        self.datatype == ColumnDataType::Json
341            && matches!(
342                self.schema
343                    .datatype_extension
344                    .as_ref()
345                    .and_then(|datatype_extension| datatype_extension.type_ext.as_ref()),
346                Some(TypeExt::JsonType(json_type))
347                     if *json_type == JsonTypeExtension::JsonBinary as i32
348            )
349    }
350}
351
352#[derive(Default)]
353struct ExistingLogSchema {
354    columns: HashMap<String, ExistingLogColumn>,
355}
356
357impl ExistingLogSchema {
358    fn try_from_table(table: &table::Table) -> Result<Self> {
359        let table_info = table.table_info();
360        Self::try_from_schema_parts(
361            table.schema_ref().column_schemas(),
362            &table_info.meta.primary_key_indices,
363        )
364    }
365
366    fn try_from_schema_parts(
367        column_schemas: &[datatypes::schema::ColumnSchema],
368        primary_key_indices: &[usize],
369    ) -> Result<Self> {
370        let mut columns = HashMap::with_capacity(column_schemas.len());
371
372        for (index, column_schema) in column_schemas.iter().enumerate() {
373            let (datatype, datatype_extension) =
374                ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
375                    .map(|wrapper| wrapper.into_parts())
376                    .map_err(Error::from)?;
377            let semantic_type = if column_schema.is_time_index() {
378                SemanticType::Timestamp
379            } else if primary_key_indices.contains(&index) {
380                SemanticType::Tag
381            } else {
382                SemanticType::Field
383            };
384            let schema = ColumnSchema {
385                column_name: column_schema.name.clone(),
386                datatype: datatype as i32,
387                semantic_type: semantic_type as i32,
388                datatype_extension,
389                options: options_from_column_schema(column_schema),
390            };
391            columns.insert(
392                schema.column_name.clone(),
393                ExistingLogColumn { schema, datatype },
394            );
395        }
396
397        Ok(Self { columns })
398    }
399
400    fn get(&self, column_name: &str) -> Option<&ExistingLogColumn> {
401        self.columns.get(column_name)
402    }
403}
404
405fn build_otlp_build_in_row(
406    log: LogRecord,
407    parse_ctx: &mut ParseContext,
408) -> (Row, JsonbValue<'static>) {
409    let log_attr = key_value_to_jsonb(log.attributes);
410    let ts = if log.time_unix_nano != 0 {
411        log.time_unix_nano
412    } else {
413        log.observed_time_unix_nano
414    };
415
416    let row = vec![
417        GreptimeValue {
418            value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
419        },
420        GreptimeValue {
421            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
422        },
423        GreptimeValue {
424            value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
425        },
426        GreptimeValue {
427            value_data: Some(ValueData::StringValue(log.severity_text)),
428        },
429        GreptimeValue {
430            value_data: Some(ValueData::I32Value(log.severity_number)),
431        },
432        GreptimeValue {
433            value_data: log
434                .body
435                .as_ref()
436                .map(|x| ValueData::StringValue(log_body_to_string(x))),
437        },
438        GreptimeValue {
439            value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
440        },
441        GreptimeValue {
442            value_data: Some(ValueData::U32Value(log.flags)),
443        },
444        GreptimeValue {
445            value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
446        },
447        GreptimeValue {
448            value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
449        },
450        GreptimeValue {
451            value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
452        },
453        GreptimeValue {
454            value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
455        },
456        GreptimeValue {
457            value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
458        },
459        GreptimeValue {
460            value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
461        },
462    ];
463    (Row { values: row }, log_attr)
464}
465
466fn extract_field_from_attr_and_combine_schema(
467    select_info: &SelectInfo,
468    select_schema: &mut SchemaInfo,
469    attrs: &jsonb::Value,
470    existing_schema: Option<&ExistingLogSchema>,
471    table_name: &str,
472) -> Result<Vec<GreptimeValue>> {
473    // note we use schema.len instead of select_keys.len
474    // because the len of the row value should always matches the len of the schema
475    let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
476
477    for key in select_info.keys.iter() {
478        let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
479            continue;
480        };
481        let Some((schema, value)) =
482            decide_column_schema_and_convert_value(key, value, existing_schema, table_name)?
483        else {
484            continue;
485        };
486
487        if let Some(index) = select_schema.index.get(key) {
488            let column_schema = &select_schema.schema[*index];
489            let column_schema: ColumnSchema = column_schema.clone().try_into()?;
490            // datatype of the same column name should be the same
491            ensure!(
492                column_schema.datatype == schema.datatype,
493                IncompatibleSchemaSnafu {
494                    column_name: key,
495                    datatype: column_schema.datatype().as_str_name(),
496                    expected: column_schema.datatype,
497                    actual: schema.datatype,
498                }
499            );
500            extracted_values[*index] = value;
501        } else {
502            select_schema.schema.push(schema.into());
503            select_schema
504                .index
505                .insert(key.clone(), select_schema.schema.len() - 1);
506            extracted_values.push(value);
507        }
508    }
509
510    Ok(extracted_values)
511}
512
513fn decide_column_schema_and_convert_value(
514    column_name: &str,
515    value: JsonbValue,
516    existing_schema: Option<&ExistingLogSchema>,
517    table_name: &str,
518) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
519    if let Some(existing_column) = existing_schema.and_then(|schema| schema.get(column_name)) {
520        return decide_existing_column_schema_and_convert_value(
521            column_name,
522            value,
523            existing_column,
524            table_name,
525        );
526    }
527
528    let column_info = match value {
529        JsonbValue::String(s) => Ok(Some((
530            GreptimeValue {
531                value_data: Some(ValueData::StringValue(s.into())),
532            },
533            ColumnDataType::String,
534            SemanticType::Tag,
535            None,
536        ))),
537        JsonbValue::Number(n) => match n {
538            JsonbNumber::Int64(i) => Ok(Some((
539                GreptimeValue {
540                    value_data: Some(ValueData::I64Value(i)),
541                },
542                ColumnDataType::Int64,
543                SemanticType::Tag,
544                None,
545            ))),
546            JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
547                ty: "FLOAT".to_string(),
548                key: column_name,
549            }
550            .fail(),
551            JsonbNumber::UInt64(u) => Ok(Some((
552                GreptimeValue {
553                    value_data: Some(ValueData::U64Value(u)),
554                },
555                ColumnDataType::Uint64,
556                SemanticType::Tag,
557                None,
558            ))),
559        },
560        JsonbValue::Bool(b) => Ok(Some((
561            GreptimeValue {
562                value_data: Some(ValueData::BoolValue(b)),
563            },
564            ColumnDataType::Boolean,
565            SemanticType::Tag,
566            None,
567        ))),
568        JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
569            ty: "Json".to_string(),
570            key: column_name,
571        }
572        .fail(),
573        JsonbValue::Null => Ok(None),
574    };
575    column_info.map(|c| {
576        c.map(|(value, column_type, semantic_type, datatype_extension)| {
577            (
578                ColumnSchema {
579                    column_name: column_name.to_string(),
580                    datatype: column_type as i32,
581                    semantic_type: semantic_type as i32,
582                    datatype_extension,
583                    options: None,
584                },
585                value,
586            )
587        })
588    })
589}
590
591fn decide_existing_column_schema_and_convert_value(
592    column_name: &str,
593    value: JsonbValue,
594    existing_column: &ExistingLogColumn,
595    table_name: &str,
596) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
597    let Some((value_data, request_type)) = jsonb_value_to_log_value_data(column_name, value, true)?
598    else {
599        return Ok(None);
600    };
601    let value_data = coerce_log_value_data(
602        Some(value_data),
603        existing_column.datatype,
604        existing_column.schema.semantic_type(),
605        request_type,
606        existing_column.is_json_binary(),
607        column_name,
608        table_name,
609    )?;
610
611    Ok(Some((
612        existing_column.schema.clone(),
613        GreptimeValue { value_data },
614    )))
615}
616
617fn jsonb_value_to_log_value_data(
618    column_name: &str,
619    value: JsonbValue,
620    allow_float: bool,
621) -> Result<Option<(ValueData, ColumnDataType)>> {
622    match value {
623        JsonbValue::String(s) => Ok(Some((
624            ValueData::StringValue(s.into()),
625            ColumnDataType::String,
626        ))),
627        JsonbValue::Number(n) => match n {
628            JsonbNumber::Int64(i) => Ok(Some((ValueData::I64Value(i), ColumnDataType::Int64))),
629            JsonbNumber::Float64(f) if allow_float => {
630                Ok(Some((ValueData::F64Value(f), ColumnDataType::Float64)))
631            }
632            JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
633                ty: "FLOAT".to_string(),
634                key: column_name,
635            }
636            .fail(),
637            JsonbNumber::UInt64(u) => Ok(Some((ValueData::U64Value(u), ColumnDataType::Uint64))),
638        },
639        JsonbValue::Bool(b) => Ok(Some((ValueData::BoolValue(b), ColumnDataType::Boolean))),
640        JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
641            ty: "Json".to_string(),
642            key: column_name,
643        }
644        .fail(),
645        JsonbValue::Null => Ok(None),
646    }
647}
648
649fn align_rows_with_existing_schema(
650    schemas: &mut [ColumnSchema],
651    rows: &mut [Row],
652    existing_schema: Option<&ExistingLogSchema>,
653    table_name: &str,
654) -> Result<()> {
655    let Some(existing_schema) = existing_schema else {
656        return Ok(());
657    };
658
659    for (column_idx, schema) in schemas.iter_mut().enumerate() {
660        let request_type = schema.datatype();
661        let Some(existing_column) = existing_schema.get(&schema.column_name) else {
662            // Existing tables own their primary key definition; request-only
663            // columns must not expand it.
664            if schema.semantic_type() == SemanticType::Tag {
665                schema.semantic_type = SemanticType::Field as i32;
666            }
667            continue;
668        };
669
670        let target_type = existing_column.datatype;
671        let semantic_type = existing_column.schema.semantic_type();
672        let target_is_json_binary = existing_column.is_json_binary();
673        for row in rows.iter_mut() {
674            let Some(value) = row.values.get_mut(column_idx) else {
675                continue;
676            };
677            value.value_data = coerce_log_value_data(
678                value.value_data.take(),
679                target_type,
680                semantic_type,
681                request_type,
682                target_is_json_binary,
683                &schema.column_name,
684                table_name,
685            )?;
686        }
687        *schema = existing_column.schema_for_request_type(request_type);
688    }
689
690    Ok(())
691}
692
693fn coerce_log_value_data(
694    value_data: Option<ValueData>,
695    target_type: ColumnDataType,
696    _semantic_type: SemanticType,
697    request_type: ColumnDataType,
698    target_is_json_binary: bool,
699    column_name: &str,
700    table_name: &str,
701) -> Result<Option<ValueData>> {
702    let Some(value_data) = value_data else {
703        return Ok(None);
704    };
705
706    if request_type == target_type {
707        return Ok(Some(value_data));
708    }
709
710    if request_type == ColumnDataType::Binary && target_is_json_binary {
711        return Ok(Some(value_data));
712    }
713
714    if is_timestamp_type(request_type)
715        && let Some(target_unit) = timestamp_unit(target_type)
716    {
717        return align_timestamp_value(value_data, target_unit, column_name, table_name).map(Some);
718    }
719
720    if target_type == ColumnDataType::String {
721        if let Ok(value_data) =
722            coerce_value_data(&Some(value_data.clone()), target_type, request_type)
723        {
724            return Ok(value_data);
725        }
726        if let Some(value_data) = stringify_scalar_value(value_data) {
727            return Ok(Some(value_data));
728        }
729    }
730
731    InvalidParameterSnafu {
732        reason: format!(
733            "failed to align log column '{}' in table '{}' from {:?} to {:?}",
734            column_name, table_name, request_type, target_type
735        ),
736    }
737    .fail()
738}
739
740fn stringify_scalar_value(value_data: ValueData) -> Option<ValueData> {
741    let value = match value_data {
742        ValueData::StringValue(value) => value,
743        ValueData::BoolValue(value) => value.to_string(),
744        ValueData::I8Value(value) => value.to_string(),
745        ValueData::I16Value(value) => value.to_string(),
746        ValueData::I32Value(value) => value.to_string(),
747        ValueData::I64Value(value) => value.to_string(),
748        ValueData::U8Value(value) => value.to_string(),
749        ValueData::U16Value(value) => value.to_string(),
750        ValueData::U32Value(value) => value.to_string(),
751        ValueData::U64Value(value) => value.to_string(),
752        ValueData::F32Value(value) => value.to_string(),
753        ValueData::F64Value(value) => value.to_string(),
754        _ => return None,
755    };
756    Some(ValueData::StringValue(value))
757}
758
759fn align_timestamp_value(
760    value_data: ValueData,
761    target_unit: TimeUnit,
762    column_name: &str,
763    table_name: &str,
764) -> Result<ValueData> {
765    let timestamp = match value_data {
766        ValueData::TimestampSecondValue(value) => Timestamp::new_second(value),
767        ValueData::TimestampMillisecondValue(value) => Timestamp::new_millisecond(value),
768        ValueData::TimestampMicrosecondValue(value) => Timestamp::new_microsecond(value),
769        ValueData::TimestampNanosecondValue(value) => Timestamp::new_nanosecond(value),
770        value_data => {
771            return InvalidParameterSnafu {
772                reason: format!(
773                    "failed to align log column '{}' in table '{}' from non-timestamp value {:?}",
774                    column_name, table_name, value_data
775                ),
776            }
777            .fail();
778        }
779    };
780    let timestamp = timestamp.convert_to(target_unit).ok_or_else(|| {
781        InvalidParameterSnafu {
782            reason: format!(
783                "failed to align log column '{}' in table '{}' to timestamp unit {}",
784                column_name, table_name, target_unit
785            ),
786        }
787        .build()
788    })?;
789
790    Ok(match target_unit {
791        TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
792        TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
793        TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
794        TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
795    })
796}
797
798fn is_timestamp_type(datatype: ColumnDataType) -> bool {
799    timestamp_unit(datatype).is_some()
800}
801
802fn timestamp_unit(datatype: ColumnDataType) -> Option<TimeUnit> {
803    match datatype {
804        ColumnDataType::TimestampSecond => Some(TimeUnit::Second),
805        ColumnDataType::TimestampMillisecond => Some(TimeUnit::Millisecond),
806        ColumnDataType::TimestampMicrosecond => Some(TimeUnit::Microsecond),
807        ColumnDataType::TimestampNanosecond => Some(TimeUnit::Nanosecond),
808        _ => None,
809    }
810}
811
812fn parse_export_logs_service_request_to_rows(
813    request: ExportLogsServiceRequest,
814    select_info: Box<SelectInfo>,
815    existing_schema: Option<&ExistingLogSchema>,
816    table_name: &str,
817) -> Result<Rows> {
818    let mut schemas = build_otlp_logs_identity_schema();
819
820    let mut parse_ctx = ParseContext::new(select_info, existing_schema, table_name);
821    let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
822
823    schemas.extend(parse_ctx.select_schema.column_schemas()?);
824    align_rows_with_existing_schema(&mut schemas, &mut rows, existing_schema, table_name)?;
825
826    rows.iter_mut().for_each(|row| {
827        row.values.resize(schemas.len(), GreptimeValue::default());
828    });
829
830    Ok(Rows {
831        schema: schemas,
832        rows,
833    })
834}
835
836fn parse_resource(
837    parse_ctx: &mut ParseContext,
838    resource_logs_vec: Vec<ResourceLogs>,
839) -> Result<Vec<Row>> {
840    let total_len = resource_logs_vec
841        .iter()
842        .flat_map(|r| r.scope_logs.iter())
843        .map(|s| s.log_records.len())
844        .sum();
845
846    let mut results = Vec::with_capacity(total_len);
847
848    for r in resource_logs_vec {
849        parse_ctx.resource_attr = r
850            .resource
851            .map(|resource| key_value_to_jsonb(resource.attributes))
852            .unwrap_or(JsonbValue::Null);
853
854        parse_ctx.resource_url = r.schema_url;
855
856        parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
857            &parse_ctx.select_info,
858            &mut parse_ctx.select_schema,
859            &parse_ctx.resource_attr,
860            parse_ctx.existing_schema,
861            parse_ctx.table_name,
862        )?;
863
864        let rows = parse_scope(r.scope_logs, parse_ctx)?;
865        results.extend(rows);
866    }
867    Ok(results)
868}
869
870struct ParseContext<'a> {
871    // input selected keys
872    select_info: Box<SelectInfo>,
873    existing_schema: Option<&'a ExistingLogSchema>,
874    table_name: &'a str,
875    // schema infos for selected keys from resource/scope/log for current request
876    // since the value override from bottom to top, the max capacity is the length of the keys
877    select_schema: SchemaInfo,
878
879    // extracted and uplifted values using select keys
880    resource_uplift_values: Vec<GreptimeValue>,
881    scope_uplift_values: Vec<GreptimeValue>,
882
883    // passdown values
884    resource_url: String,
885    resource_attr: JsonbValue<'a>,
886    scope_name: Option<String>,
887    scope_version: Option<String>,
888    scope_url: String,
889    scope_attrs: JsonbValue<'a>,
890}
891
892impl<'a> ParseContext<'a> {
893    pub fn new(
894        select_info: Box<SelectInfo>,
895        existing_schema: Option<&'a ExistingLogSchema>,
896        table_name: &'a str,
897    ) -> ParseContext<'a> {
898        let len = select_info.keys.len();
899        ParseContext {
900            select_info,
901            existing_schema,
902            table_name,
903            select_schema: SchemaInfo::with_capacity(len),
904            resource_uplift_values: vec![],
905            scope_uplift_values: vec![],
906            resource_url: String::new(),
907            resource_attr: JsonbValue::Null,
908            scope_name: None,
909            scope_version: None,
910            scope_url: String::new(),
911            scope_attrs: JsonbValue::Null,
912        }
913    }
914}
915
916fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
917    let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
918    let mut results = Vec::with_capacity(len);
919
920    for scope_logs in scopes_log_vec {
921        let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
922        parse_ctx.scope_name = scope_name;
923        parse_ctx.scope_version = scope_version;
924        parse_ctx.scope_url = scope_logs.schema_url;
925        parse_ctx.scope_attrs = scope_attrs;
926
927        parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
928            &parse_ctx.select_info,
929            &mut parse_ctx.select_schema,
930            &parse_ctx.scope_attrs,
931            parse_ctx.existing_schema,
932            parse_ctx.table_name,
933        )?;
934
935        let rows = parse_log(scope_logs.log_records, parse_ctx)?;
936        results.extend(rows);
937    }
938    Ok(results)
939}
940
941fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
942    let mut result = Vec::with_capacity(log_records.len());
943
944    for log in log_records {
945        let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
946
947        let log_values = extract_field_from_attr_and_combine_schema(
948            &parse_ctx.select_info,
949            &mut parse_ctx.select_schema,
950            &log_attr,
951            parse_ctx.existing_schema,
952            parse_ctx.table_name,
953        )?;
954
955        let extracted_values = merge_values(
956            log_values,
957            &parse_ctx.scope_uplift_values,
958            &parse_ctx.resource_uplift_values,
959        );
960
961        row.values.extend(extracted_values);
962
963        result.push(row);
964    }
965    Ok(result)
966}
967
968fn merge_values(
969    log: Vec<GreptimeValue>,
970    scope: &[GreptimeValue],
971    resource: &[GreptimeValue],
972) -> Vec<GreptimeValue> {
973    log.into_iter()
974        .enumerate()
975        .map(|(i, value)| GreptimeValue {
976            value_data: value
977                .value_data
978                .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
979                .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
980        })
981        .collect()
982}
983
984/// transform otlp logs request to pipeline value
985/// https://opentelemetry.io/docs/concepts/signals/logs/
986fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<VrlValue> {
987    let mut result = Vec::new();
988    for r in request.resource_logs {
989        let resource_attr = r
990            .resource
991            .map(|x| VrlValue::Object(key_value_to_map(x.attributes)))
992            .unwrap_or(VrlValue::Null);
993        let resource_schema_url = VrlValue::Bytes(r.schema_url.into());
994        for scope_logs in r.scope_logs {
995            let (scope_attrs, scope_version, scope_name) =
996                scope_to_pipeline_value(scope_logs.scope);
997            let scope_schema_url = VrlValue::Bytes(scope_logs.schema_url.into());
998            for log in scope_logs.log_records {
999                let value = log_to_pipeline_value(
1000                    log,
1001                    resource_schema_url.clone(),
1002                    resource_attr.clone(),
1003                    scope_schema_url.clone(),
1004                    scope_name.clone(),
1005                    scope_version.clone(),
1006                    scope_attrs.clone(),
1007                );
1008                result.push(value);
1009            }
1010        }
1011    }
1012    result
1013}
1014
1015// convert AnyValue to pipeline value
1016fn any_value_to_vrl_value(value: any_value::Value) -> VrlValue {
1017    match value {
1018        any_value::Value::StringValue(s) => VrlValue::Bytes(s.into()),
1019        any_value::Value::IntValue(i) => VrlValue::Integer(i),
1020        any_value::Value::DoubleValue(d) => VrlValue::Float(NotNan::new(d).unwrap()),
1021        any_value::Value::BoolValue(b) => VrlValue::Boolean(b),
1022        any_value::Value::ArrayValue(array_value) => {
1023            let values = array_value
1024                .values
1025                .into_iter()
1026                .filter_map(|v| v.value.map(any_value_to_vrl_value))
1027                .collect();
1028            VrlValue::Array(values)
1029        }
1030        any_value::Value::KvlistValue(key_value_list) => {
1031            VrlValue::Object(key_value_to_map(key_value_list.values))
1032        }
1033        any_value::Value::BytesValue(items) => VrlValue::Bytes(Bytes::from(items)),
1034    }
1035}
1036
1037// convert otlp keyValue vec to map
1038fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<KeyString, VrlValue> {
1039    let mut map = BTreeMap::new();
1040    for kv in key_values {
1041        let value = match kv.value {
1042            Some(value) => match value.value {
1043                Some(value) => any_value_to_vrl_value(value),
1044                None => VrlValue::Null,
1045            },
1046            None => VrlValue::Null,
1047        };
1048        map.insert(kv.key.into(), value);
1049    }
1050    map
1051}
1052
1053fn log_body_to_string(body: &AnyValue) -> String {
1054    let otlp_value = OtlpAnyValue::from(body);
1055    otlp_value.to_string()
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use datatypes::prelude::ConcreteDataType;
1061    use datatypes::schema::ColumnSchema as DatatypesColumnSchema;
1062    use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
1063
1064    use super::*;
1065
1066    fn time_column(datatype: ConcreteDataType) -> DatatypesColumnSchema {
1067        DatatypesColumnSchema::new("timestamp", datatype, false).with_time_index(true)
1068    }
1069
1070    fn column(name: &str, datatype: ConcreteDataType) -> DatatypesColumnSchema {
1071        DatatypesColumnSchema::new(name, datatype, true)
1072    }
1073
1074    fn existing_schema(
1075        columns: Vec<DatatypesColumnSchema>,
1076        primary_key_indices: &[usize],
1077    ) -> ExistingLogSchema {
1078        ExistingLogSchema::try_from_schema_parts(&columns, primary_key_indices).unwrap()
1079    }
1080
1081    fn kv(key: &str, value: OtlpValue) -> KeyValue {
1082        KeyValue {
1083            key: key.to_string(),
1084            value: Some(AnyValue { value: Some(value) }),
1085        }
1086    }
1087
1088    fn request_with_log_attrs(attrs: Vec<KeyValue>) -> ExportLogsServiceRequest {
1089        ExportLogsServiceRequest {
1090            resource_logs: vec![ResourceLogs {
1091                scope_logs: vec![ScopeLogs {
1092                    log_records: vec![LogRecord {
1093                        time_unix_nano: 1_234_000_000,
1094                        trace_id: vec![1; 16],
1095                        attributes: attrs,
1096                        ..Default::default()
1097                    }],
1098                    ..Default::default()
1099                }],
1100                ..Default::default()
1101            }],
1102        }
1103    }
1104
1105    fn parse_with_select(
1106        request: ExportLogsServiceRequest,
1107        select: &str,
1108        existing_schema: Option<&ExistingLogSchema>,
1109    ) -> Result<Rows> {
1110        parse_export_logs_service_request_to_rows(
1111            request,
1112            Box::new(SelectInfo::from(select.to_string())),
1113            existing_schema,
1114            "test_logs",
1115        )
1116    }
1117
1118    fn column_index(rows: &Rows, name: &str) -> usize {
1119        rows.schema
1120            .iter()
1121            .position(|schema| schema.column_name == name)
1122            .unwrap()
1123    }
1124
1125    #[test]
1126    fn test_no_existing_table_preserves_direct_schema() {
1127        let rows = parse_with_select(request_with_log_attrs(vec![]), "", None).unwrap();
1128
1129        assert_eq!(rows.schema[0].column_name, "timestamp");
1130        assert_eq!(
1131            rows.schema[0].datatype,
1132            ColumnDataType::TimestampNanosecond as i32
1133        );
1134        assert_eq!(rows.schema[0].semantic_type, SemanticType::Timestamp as i32);
1135        let scope_name_idx = column_index(&rows, "scope_name");
1136        assert_eq!(
1137            rows.schema[scope_name_idx].semantic_type,
1138            SemanticType::Tag as i32
1139        );
1140    }
1141
1142    #[test]
1143    fn test_existing_primary_key_updates_builtin_column_semantic_type() {
1144        let existing = existing_schema(
1145            vec![
1146                time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1147                column("trace_id", ConcreteDataType::string_datatype()),
1148            ],
1149            &[1],
1150        );
1151
1152        let rows = parse_with_select(request_with_log_attrs(vec![]), "", Some(&existing)).unwrap();
1153        let trace_id_idx = column_index(&rows, "trace_id");
1154
1155        assert_eq!(
1156            rows.schema[trace_id_idx].semantic_type,
1157            SemanticType::Tag as i32
1158        );
1159    }
1160
1161    #[test]
1162    fn test_existing_string_primary_key_stringifies_selected_scalar_values() {
1163        let existing = existing_schema(
1164            vec![
1165                time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1166                column("host", ConcreteDataType::string_datatype()),
1167            ],
1168            &[1],
1169        );
1170        let rows = parse_with_select(
1171            request_with_log_attrs(vec![kv("host", OtlpValue::IntValue(42))]),
1172            "host",
1173            Some(&existing),
1174        )
1175        .unwrap();
1176        let host_idx = column_index(&rows, "host");
1177
1178        assert_eq!(
1179            rows.schema[host_idx].datatype,
1180            ColumnDataType::String as i32
1181        );
1182        assert_eq!(
1183            rows.schema[host_idx].semantic_type,
1184            SemanticType::Tag as i32
1185        );
1186        assert_eq!(
1187            rows.rows[0].values[host_idx].value_data,
1188            Some(ValueData::StringValue("42".to_string()))
1189        );
1190    }
1191
1192    #[test]
1193    fn test_existing_string_field_stringifies_selected_scalar_values() {
1194        let existing = existing_schema(
1195            vec![
1196                time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1197                column("host", ConcreteDataType::string_datatype()),
1198            ],
1199            &[],
1200        );
1201        let rows = parse_with_select(
1202            request_with_log_attrs(vec![kv("host", OtlpValue::IntValue(42))]),
1203            "host",
1204            Some(&existing),
1205        )
1206        .unwrap();
1207        let host_idx = column_index(&rows, "host");
1208
1209        assert_eq!(
1210            rows.schema[host_idx].datatype,
1211            ColumnDataType::String as i32
1212        );
1213        assert_eq!(
1214            rows.schema[host_idx].semantic_type,
1215            SemanticType::Field as i32
1216        );
1217        assert_eq!(
1218            rows.rows[0].values[host_idx].value_data,
1219            Some(ValueData::StringValue("42".to_string()))
1220        );
1221    }
1222
1223    #[test]
1224    fn test_existing_non_string_primary_key_rejects_incompatible_selected_value() {
1225        let existing = existing_schema(
1226            vec![
1227                time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1228                column("host", ConcreteDataType::int64_datatype()),
1229            ],
1230            &[1],
1231        );
1232        let err = parse_with_select(
1233            request_with_log_attrs(vec![kv(
1234                "host",
1235                OtlpValue::StringValue("node-a".to_string()),
1236            )]),
1237            "host",
1238            Some(&existing),
1239        )
1240        .unwrap_err();
1241
1242        assert!(
1243            err.to_string()
1244                .contains("failed to align log column 'host'")
1245        );
1246    }
1247
1248    #[test]
1249    fn test_existing_timestamp_unit_is_respected() {
1250        let existing = existing_schema(
1251            vec![time_column(
1252                ConcreteDataType::timestamp_millisecond_datatype(),
1253            )],
1254            &[],
1255        );
1256        let rows = parse_with_select(request_with_log_attrs(vec![]), "", Some(&existing)).unwrap();
1257
1258        assert_eq!(
1259            rows.schema[0].datatype,
1260            ColumnDataType::TimestampMillisecond as i32
1261        );
1262        assert_eq!(
1263            rows.rows[0].values[0].value_data,
1264            Some(ValueData::TimestampMillisecondValue(1234))
1265        );
1266    }
1267
1268    #[test]
1269    fn test_missing_existing_primary_key_is_not_generated() {
1270        let existing = existing_schema(
1271            vec![
1272                time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1273                column("host", ConcreteDataType::string_datatype()),
1274            ],
1275            &[1],
1276        );
1277        let rows = parse_with_select(request_with_log_attrs(vec![]), "", Some(&existing)).unwrap();
1278
1279        assert!(
1280            !rows
1281                .schema
1282                .iter()
1283                .any(|schema| schema.column_name == "host")
1284        );
1285    }
1286
1287    #[test]
1288    fn test_existing_table_keeps_new_generated_columns_as_fields() {
1289        let existing = existing_schema(
1290            vec![
1291                time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1292                column("trace_id", ConcreteDataType::string_datatype()),
1293            ],
1294            &[1],
1295        );
1296        let rows = parse_with_select(
1297            request_with_log_attrs(vec![kv(
1298                "host",
1299                OtlpValue::StringValue("node-a".to_string()),
1300            )]),
1301            "host",
1302            Some(&existing),
1303        )
1304        .unwrap();
1305        let host_idx = column_index(&rows, "host");
1306        let scope_name_idx = column_index(&rows, "scope_name");
1307
1308        assert_eq!(
1309            rows.schema[host_idx].semantic_type,
1310            SemanticType::Field as i32
1311        );
1312        assert_eq!(
1313            rows.schema[scope_name_idx].semantic_type,
1314            SemanticType::Field as i32
1315        );
1316    }
1317}