servers/otlp/trace/
v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{parse, TraceSpan};
29use crate::otlp::trace::{
30    DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
31    SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
32    TRACE_ID_COLUMN,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40/// Convert SpanTraces to GreptimeDB row insert requests.
41/// Returns `InsertRequests` and total number of rows to ingest
42///
43/// Compared with v0, this v1 implementation:
44/// 1. flattens all attribute data into columns.
45/// 2. treat `span_id` and `parent_trace_id` as fields.
46/// 3. removed `service_name` column because it's already in
47///    `resource_attributes.service_name`
48///
49/// For other compound data structures like span_links and span_events here we
50/// are still using `json` data structure.
51pub fn v1_to_grpc_insert_requests(
52    request: ExportTraceServiceRequest,
53    _pipeline: PipelineWay,
54    _pipeline_params: GreptimePipelineParams,
55    table_name: String,
56    _query_ctx: &QueryContextRef,
57    _pipeline_handler: PipelineHandlerRef,
58) -> Result<(RowInsertRequests, usize)> {
59    let spans = parse(request);
60    let mut multi_table_writer = MultiTableData::default();
61    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
62    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
63
64    let mut services = HashSet::new();
65    for span in spans {
66        if let Some(service_name) = &span.service_name {
67            // Only insert the service name if it's not already in the set.
68            if !services.contains(service_name) {
69                services.insert(service_name.clone());
70            }
71        }
72        write_span_to_row(&mut trace_writer, span)?;
73    }
74    write_trace_services_to_row(&mut trace_services_writer, services)?;
75
76    multi_table_writer.add_table_data(
77        trace_services_table_name(&table_name),
78        trace_services_writer,
79    );
80    multi_table_writer.add_table_data(table_name, trace_writer);
81
82    Ok(multi_table_writer.into_row_insert_requests())
83}
84
85pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
86    let mut row = writer.alloc_one_row();
87
88    // write ts
89    row_writer::write_ts_to_nanos(
90        writer,
91        TIMESTAMP_COLUMN,
92        Some(span.start_in_nanosecond as i64),
93        Precision::Nanosecond,
94        &mut row,
95    )?;
96    // write ts fields
97    let fields = vec![
98        make_column_data(
99            "timestamp_end",
100            ColumnDataType::TimestampNanosecond,
101            ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
102        ),
103        make_column_data(
104            DURATION_NANO_COLUMN,
105            ColumnDataType::Uint64,
106            ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
107        ),
108    ];
109    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
110
111    // write fields
112    if let Some(parent_span_id) = span.parent_span_id {
113        row_writer::write_fields(
114            writer,
115            std::iter::once(make_string_column_data(
116                PARENT_SPAN_ID_COLUMN,
117                parent_span_id,
118            )),
119            &mut row,
120        )?;
121    }
122
123    let fields = vec![
124        make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
125        make_string_column_data(SPAN_ID_COLUMN, span.span_id),
126        make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
127        make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
128        make_string_column_data("span_status_code", span.span_status_code),
129        make_string_column_data("span_status_message", span.span_status_message),
130        make_string_column_data("trace_state", span.trace_state),
131        make_string_column_data("scope_name", span.scope_name),
132        make_string_column_data("scope_version", span.scope_version),
133    ];
134    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
135
136    if let Some(service_name) = span.service_name {
137        row_writer::write_tags(
138            writer,
139            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
140            &mut row,
141        )?;
142    }
143
144    write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
145    write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
146    write_attributes(
147        writer,
148        "resource_attributes",
149        span.resource_attributes,
150        &mut row,
151    )?;
152
153    row_writer::write_json(
154        writer,
155        SPAN_EVENTS_COLUMN,
156        span.span_events.into(),
157        &mut row,
158    )?;
159    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
160
161    writer.add_row(row);
162
163    Ok(())
164}
165
166fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
167    for service_name in services {
168        let mut row = writer.alloc_one_row();
169        // Write the timestamp as 0.
170        row_writer::write_ts_to_nanos(
171            writer,
172            TIMESTAMP_COLUMN,
173            Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
174            Precision::Nanosecond,
175            &mut row,
176        )?;
177
178        // Write the `service_name` column.
179        row_writer::write_tags(
180            writer,
181            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
182            &mut row,
183        )?;
184        writer.add_row(row);
185    }
186
187    Ok(())
188}
189
190fn write_attributes(
191    writer: &mut TableData,
192    prefix: &str,
193    attributes: Attributes,
194    row: &mut Vec<Value>,
195) -> Result<()> {
196    for attr in attributes.take().into_iter() {
197        let key_suffix = attr.key;
198        // skip resource_attributes.service.name because its already copied to
199        // top level as `SERVICE_NAME_COLUMN`
200        if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
201            continue;
202        }
203
204        let key = format!("{}.{}", prefix, key_suffix);
205        match attr.value.and_then(|v| v.value) {
206            Some(OtlpValue::StringValue(v)) => {
207                row_writer::write_fields(
208                    writer,
209                    std::iter::once(make_string_column_data(&key, v)),
210                    row,
211                )?;
212            }
213            Some(OtlpValue::BoolValue(v)) => {
214                row_writer::write_fields(
215                    writer,
216                    std::iter::once(make_column_data(
217                        &key,
218                        ColumnDataType::Boolean,
219                        ValueData::BoolValue(v),
220                    )),
221                    row,
222                )?;
223            }
224            Some(OtlpValue::IntValue(v)) => {
225                row_writer::write_fields(
226                    writer,
227                    std::iter::once(make_column_data(
228                        &key,
229                        ColumnDataType::Int64,
230                        ValueData::I64Value(v),
231                    )),
232                    row,
233                )?;
234            }
235            Some(OtlpValue::DoubleValue(v)) => {
236                row_writer::write_fields(
237                    writer,
238                    std::iter::once(make_column_data(
239                        &key,
240                        ColumnDataType::Float64,
241                        ValueData::F64Value(v),
242                    )),
243                    row,
244                )?;
245            }
246            Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
247                writer,
248                key,
249                any_value_to_jsonb(OtlpValue::ArrayValue(v)),
250                row,
251            )?,
252            Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
253                writer,
254                key,
255                any_value_to_jsonb(OtlpValue::KvlistValue(v)),
256                row,
257            )?,
258            Some(OtlpValue::BytesValue(v)) => {
259                row_writer::write_fields(
260                    writer,
261                    std::iter::once(make_column_data(
262                        &key,
263                        ColumnDataType::Binary,
264                        ValueData::BinaryValue(v),
265                    )),
266                    row,
267                )?;
268            }
269            None => {}
270        }
271    }
272
273    Ok(())
274}