servers/otlp/trace/
v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{parse, TraceSpan};
29use crate::otlp::trace::{
30    DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
31    SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
32    TRACE_ID_COLUMN,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40/// Convert SpanTraces to GreptimeDB row insert requests.
41/// Returns `InsertRequests` and total number of rows to ingest
42///
43/// Compared with v0, this v1 implementation:
44/// 1. flattens all attribute data into columns.
45/// 2. treat `span_id` and `parent_trace_id` as fields.
46/// 3. removed `service_name` column because it's already in
47///    `resource_attributes.service_name`
48///
49/// For other compound data structures like span_links and span_events here we
50/// are still using `json` data structure.
51pub fn v1_to_grpc_insert_requests(
52    request: ExportTraceServiceRequest,
53    _pipeline: PipelineWay,
54    _pipeline_params: GreptimePipelineParams,
55    table_name: String,
56    _query_ctx: &QueryContextRef,
57    _pipeline_handler: PipelineHandlerRef,
58) -> Result<(RowInsertRequests, usize)> {
59    let spans = parse(request);
60    let mut multi_table_writer = MultiTableData::default();
61    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
62    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
63
64    let mut services = HashSet::new();
65    for span in spans {
66        if let Some(service_name) = &span.service_name {
67            // Only insert the service name if it's not already in the set.
68            if !services.contains(service_name) {
69                services.insert(service_name.clone());
70            }
71        }
72        write_span_to_row(&mut trace_writer, span)?;
73    }
74    write_trace_services_to_row(&mut trace_services_writer, services)?;
75
76    multi_table_writer.add_table_data(
77        trace_services_table_name(&table_name),
78        trace_services_writer,
79    );
80    multi_table_writer.add_table_data(table_name, trace_writer);
81
82    Ok(multi_table_writer.into_row_insert_requests())
83}
84
85pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
86    let mut row = writer.alloc_one_row();
87
88    // write ts
89    row_writer::write_ts_to_nanos(
90        writer,
91        TIMESTAMP_COLUMN,
92        Some(span.start_in_nanosecond as i64),
93        Precision::Nanosecond,
94        &mut row,
95    )?;
96
97    // write fields
98    let fields = vec![
99        make_column_data(
100            "timestamp_end",
101            ColumnDataType::TimestampNanosecond,
102            Some(ValueData::TimestampNanosecondValue(
103                span.end_in_nanosecond as i64,
104            )),
105        ),
106        make_column_data(
107            DURATION_NANO_COLUMN,
108            ColumnDataType::Uint64,
109            Some(ValueData::U64Value(
110                span.end_in_nanosecond - span.start_in_nanosecond,
111            )),
112        ),
113        make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
114        make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
115        make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
116        make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
117        make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
118        make_string_column_data("span_status_code", Some(span.span_status_code)),
119        make_string_column_data("span_status_message", Some(span.span_status_message)),
120        make_string_column_data("trace_state", Some(span.trace_state)),
121        make_string_column_data("scope_name", Some(span.scope_name)),
122        make_string_column_data("scope_version", Some(span.scope_version)),
123    ];
124    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
125
126    if let Some(service_name) = span.service_name {
127        row_writer::write_tags(
128            writer,
129            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
130            &mut row,
131        )?;
132    }
133
134    write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
135    write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
136    write_attributes(
137        writer,
138        "resource_attributes",
139        span.resource_attributes,
140        &mut row,
141    )?;
142
143    row_writer::write_json(
144        writer,
145        SPAN_EVENTS_COLUMN,
146        span.span_events.into(),
147        &mut row,
148    )?;
149    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
150
151    writer.add_row(row);
152
153    Ok(())
154}
155
156fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
157    for service_name in services {
158        let mut row = writer.alloc_one_row();
159        // Write the timestamp as 0.
160        row_writer::write_ts_to_nanos(
161            writer,
162            TIMESTAMP_COLUMN,
163            Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
164            Precision::Nanosecond,
165            &mut row,
166        )?;
167
168        // Write the `service_name` column.
169        row_writer::write_tags(
170            writer,
171            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
172            &mut row,
173        )?;
174        writer.add_row(row);
175    }
176
177    Ok(())
178}
179
180fn write_attributes(
181    writer: &mut TableData,
182    prefix: &str,
183    attributes: Attributes,
184    row: &mut Vec<Value>,
185) -> Result<()> {
186    for attr in attributes.take().into_iter() {
187        let key_suffix = attr.key;
188        // skip resource_attributes.service.name because its already copied to
189        // top level as `SERVICE_NAME_COLUMN`
190        if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
191            continue;
192        }
193
194        let key = format!("{}.{}", prefix, key_suffix);
195        match attr.value.and_then(|v| v.value) {
196            Some(OtlpValue::StringValue(v)) => {
197                row_writer::write_fields(
198                    writer,
199                    std::iter::once(make_string_column_data(&key, Some(v))),
200                    row,
201                )?;
202            }
203            Some(OtlpValue::BoolValue(v)) => {
204                row_writer::write_fields(
205                    writer,
206                    std::iter::once(make_column_data(
207                        &key,
208                        ColumnDataType::Boolean,
209                        Some(ValueData::BoolValue(v)),
210                    )),
211                    row,
212                )?;
213            }
214            Some(OtlpValue::IntValue(v)) => {
215                row_writer::write_fields(
216                    writer,
217                    std::iter::once(make_column_data(
218                        &key,
219                        ColumnDataType::Int64,
220                        Some(ValueData::I64Value(v)),
221                    )),
222                    row,
223                )?;
224            }
225            Some(OtlpValue::DoubleValue(v)) => {
226                row_writer::write_fields(
227                    writer,
228                    std::iter::once(make_column_data(
229                        &key,
230                        ColumnDataType::Float64,
231                        Some(ValueData::F64Value(v)),
232                    )),
233                    row,
234                )?;
235            }
236            Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
237                writer,
238                key,
239                any_value_to_jsonb(OtlpValue::ArrayValue(v)),
240                row,
241            )?,
242            Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
243                writer,
244                key,
245                any_value_to_jsonb(OtlpValue::KvlistValue(v)),
246                row,
247            )?,
248            Some(OtlpValue::BytesValue(v)) => {
249                row_writer::write_fields(
250                    writer,
251                    std::iter::once(make_column_data(
252                        &key,
253                        ColumnDataType::Binary,
254                        Some(ValueData::BinaryValue(v)),
255                    )),
256                    row,
257                )?;
258            }
259            None => {}
260        }
261    }
262
263    Ok(())
264}