servers/otlp/trace/
v0.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::span::{parse, TraceSpan};
27use crate::otlp::trace::{
28    DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
29    SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
30    TRACE_ID_COLUMN,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38/// Convert SpanTraces to GreptimeDB row insert requests.
39/// Returns `InsertRequests` and total number of rows to ingest
40pub fn v0_to_grpc_insert_requests(
41    request: ExportTraceServiceRequest,
42    _pipeline: PipelineWay,
43    _pipeline_params: GreptimePipelineParams,
44    table_name: String,
45    _query_ctx: &QueryContextRef,
46    _pipeline_handler: PipelineHandlerRef,
47) -> Result<(RowInsertRequests, usize)> {
48    let spans = parse(request);
49    let mut multi_table_writer = MultiTableData::default();
50    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
51    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
52
53    let mut services = HashSet::new();
54    for span in spans {
55        if let Some(service_name) = &span.service_name {
56            // Only insert the service name if it's not already in the set.
57            if !services.contains(service_name) {
58                services.insert(service_name.clone());
59            }
60        }
61        write_span_to_row(&mut trace_writer, span)?;
62    }
63    write_trace_services_to_row(&mut trace_services_writer, services)?;
64
65    multi_table_writer.add_table_data(
66        trace_services_table_name(&table_name),
67        trace_services_writer,
68    );
69    multi_table_writer.add_table_data(table_name, trace_writer);
70
71    Ok(multi_table_writer.into_row_insert_requests())
72}
73
74pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
75    let mut row = writer.alloc_one_row();
76
77    // write ts
78    row_writer::write_ts_to_nanos(
79        writer,
80        TIMESTAMP_COLUMN,
81        Some(span.start_in_nanosecond as i64),
82        Precision::Nanosecond,
83        &mut row,
84    )?;
85    // write ts fields
86    let fields = vec![
87        make_column_data(
88            "timestamp_end",
89            ColumnDataType::TimestampNanosecond,
90            ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
91        ),
92        make_column_data(
93            DURATION_NANO_COLUMN,
94            ColumnDataType::Uint64,
95            ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
96        ),
97    ];
98    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
99
100    if let Some(service_name) = span.service_name {
101        row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
102    }
103
104    // write fields
105    let fields = vec![
106        make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
107        make_string_column_data(SPAN_ID_COLUMN, span.span_id),
108        make_string_column_data(
109            PARENT_SPAN_ID_COLUMN,
110            span.parent_span_id.unwrap_or_default(),
111        ),
112        make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
113        make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
114        make_string_column_data("span_status_code", span.span_status_code),
115        make_string_column_data("span_status_message", span.span_status_message),
116        make_string_column_data("trace_state", span.trace_state),
117    ];
118    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
119
120    row_writer::write_json(
121        writer,
122        SPAN_ATTRIBUTES_COLUMN,
123        span.span_attributes.into(),
124        &mut row,
125    )?;
126    row_writer::write_json(
127        writer,
128        SPAN_EVENTS_COLUMN,
129        span.span_events.into(),
130        &mut row,
131    )?;
132    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
133
134    // write fields
135    let fields = vec![
136        make_string_column_data("scope_name", span.scope_name),
137        make_string_column_data("scope_version", span.scope_version),
138    ];
139    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
140
141    row_writer::write_json(
142        writer,
143        "scope_attributes",
144        span.scope_attributes.into(),
145        &mut row,
146    )?;
147
148    row_writer::write_json(
149        writer,
150        "resource_attributes",
151        span.resource_attributes.into(),
152        &mut row,
153    )?;
154
155    writer.add_row(row);
156
157    Ok(())
158}
159
160fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
161    for service_name in services {
162        let mut row = writer.alloc_one_row();
163        // Write the timestamp as 0.
164        row_writer::write_ts_to_nanos(
165            writer,
166            TIMESTAMP_COLUMN,
167            Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
168            Precision::Nanosecond,
169            &mut row,
170        )?;
171
172        // Write the `service_name` column.
173        row_writer::write_fields(
174            writer,
175            std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)),
176            &mut row,
177        )?;
178        writer.add_row(row);
179    }
180
181    Ok(())
182}