servers/otlp/trace/
v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{TraceSpan, parse};
29use crate::otlp::trace::{
30    DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
31    SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
32    TRACE_ID_COLUMN,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40// Use a timestamp(2100-01-01 00:00:00) as large as possible.
41const MAX_TIMESTAMP: i64 = 4102444800000000000;
42
43/// Convert SpanTraces to GreptimeDB row insert requests.
44/// Returns `InsertRequests` and total number of rows to ingest
45///
46/// Compared with v0, this v1 implementation:
47/// 1. flattens all attribute data into columns.
48/// 2. treat `span_id` and `parent_trace_id` as fields.
49/// 3. removed `service_name` column because it's already in
50///    `resource_attributes.service_name`
51///
52/// For other compound data structures like span_links and span_events here we
53/// are still using `json` data structure.
54pub fn v1_to_grpc_insert_requests(
55    request: ExportTraceServiceRequest,
56    _pipeline: PipelineWay,
57    _pipeline_params: GreptimePipelineParams,
58    table_name: String,
59    _query_ctx: &QueryContextRef,
60    _pipeline_handler: PipelineHandlerRef,
61) -> Result<(RowInsertRequests, usize)> {
62    let spans = parse(request);
63    let mut multi_table_writer = MultiTableData::default();
64    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
65    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
66    let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
67
68    let mut services = HashSet::new();
69    let mut operations = HashSet::new();
70    for span in spans {
71        if let Some(service_name) = &span.service_name {
72            // Only insert the service name if it's not already in the set.
73            if !services.contains(service_name) {
74                services.insert(service_name.clone());
75            }
76
77            // Only insert the operation if it's not already in the set.
78            let operation = (
79                service_name.clone(),
80                span.span_name.clone(),
81                span.span_kind.clone(),
82            );
83            if !operations.contains(&operation) {
84                operations.insert(operation);
85            }
86        }
87        write_span_to_row(&mut trace_writer, span)?;
88    }
89    write_trace_services_to_row(&mut trace_services_writer, services)?;
90    write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
91
92    multi_table_writer.add_table_data(
93        trace_services_table_name(&table_name),
94        trace_services_writer,
95    );
96    multi_table_writer.add_table_data(
97        trace_operations_table_name(&table_name),
98        trace_operations_writer,
99    );
100    multi_table_writer.add_table_data(table_name, trace_writer);
101
102    Ok(multi_table_writer.into_row_insert_requests())
103}
104
105pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
106    let mut row = writer.alloc_one_row();
107
108    // write ts
109    row_writer::write_ts_to_nanos(
110        writer,
111        TIMESTAMP_COLUMN,
112        Some(span.start_in_nanosecond as i64),
113        Precision::Nanosecond,
114        &mut row,
115    )?;
116
117    // write fields
118    let fields = vec![
119        make_column_data(
120            "timestamp_end",
121            ColumnDataType::TimestampNanosecond,
122            Some(ValueData::TimestampNanosecondValue(
123                span.end_in_nanosecond as i64,
124            )),
125        ),
126        make_column_data(
127            DURATION_NANO_COLUMN,
128            ColumnDataType::Uint64,
129            Some(ValueData::U64Value(
130                span.end_in_nanosecond - span.start_in_nanosecond,
131            )),
132        ),
133        make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
134        make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
135        make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
136        make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
137        make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
138        make_string_column_data("span_status_code", Some(span.span_status_code)),
139        make_string_column_data("span_status_message", Some(span.span_status_message)),
140        make_string_column_data("trace_state", Some(span.trace_state)),
141        make_string_column_data("scope_name", Some(span.scope_name)),
142        make_string_column_data("scope_version", Some(span.scope_version)),
143    ];
144    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
145
146    if let Some(service_name) = span.service_name {
147        row_writer::write_tags(
148            writer,
149            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
150            &mut row,
151        )?;
152    }
153
154    write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
155    write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
156    write_attributes(
157        writer,
158        "resource_attributes",
159        span.resource_attributes,
160        &mut row,
161    )?;
162
163    row_writer::write_json(
164        writer,
165        SPAN_EVENTS_COLUMN,
166        span.span_events.into(),
167        &mut row,
168    )?;
169    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
170
171    writer.add_row(row);
172
173    Ok(())
174}
175
176fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
177    for service_name in services {
178        let mut row = writer.alloc_one_row();
179        // Write the timestamp as 0.
180        row_writer::write_ts_to_nanos(
181            writer,
182            TIMESTAMP_COLUMN,
183            Some(MAX_TIMESTAMP),
184            Precision::Nanosecond,
185            &mut row,
186        )?;
187
188        // Write the `service_name` column.
189        row_writer::write_tags(
190            writer,
191            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
192            &mut row,
193        )?;
194        writer.add_row(row);
195    }
196
197    Ok(())
198}
199
200fn write_trace_operations_to_row(
201    writer: &mut TableData,
202    operations: HashSet<(String, String, String)>,
203) -> Result<()> {
204    for (service_name, span_name, span_kind) in operations {
205        let mut row = writer.alloc_one_row();
206        // Write the timestamp as 0.
207        row_writer::write_ts_to_nanos(
208            writer,
209            TIMESTAMP_COLUMN,
210            Some(MAX_TIMESTAMP),
211            Precision::Nanosecond,
212            &mut row,
213        )?;
214
215        // Write the `service_name`, `span_name`, and `span_kind` columns as tags.
216        row_writer::write_tags(
217            writer,
218            vec![
219                (SERVICE_NAME_COLUMN.to_string(), service_name),
220                (SPAN_NAME_COLUMN.to_string(), span_name),
221                (SPAN_KIND_COLUMN.to_string(), span_kind),
222            ]
223            .into_iter(),
224            &mut row,
225        )?;
226        writer.add_row(row);
227    }
228
229    Ok(())
230}
231
232fn write_attributes(
233    writer: &mut TableData,
234    prefix: &str,
235    attributes: Attributes,
236    row: &mut Vec<Value>,
237) -> Result<()> {
238    for attr in attributes.take().into_iter() {
239        let key_suffix = attr.key;
240        // skip resource_attributes.service.name because its already copied to
241        // top level as `SERVICE_NAME_COLUMN`
242        if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
243            continue;
244        }
245
246        let key = format!("{}.{}", prefix, key_suffix);
247        match attr.value.and_then(|v| v.value) {
248            Some(OtlpValue::StringValue(v)) => {
249                row_writer::write_fields(
250                    writer,
251                    std::iter::once(make_string_column_data(&key, Some(v))),
252                    row,
253                )?;
254            }
255            Some(OtlpValue::BoolValue(v)) => {
256                row_writer::write_fields(
257                    writer,
258                    std::iter::once(make_column_data(
259                        &key,
260                        ColumnDataType::Boolean,
261                        Some(ValueData::BoolValue(v)),
262                    )),
263                    row,
264                )?;
265            }
266            Some(OtlpValue::IntValue(v)) => {
267                row_writer::write_fields(
268                    writer,
269                    std::iter::once(make_column_data(
270                        &key,
271                        ColumnDataType::Int64,
272                        Some(ValueData::I64Value(v)),
273                    )),
274                    row,
275                )?;
276            }
277            Some(OtlpValue::DoubleValue(v)) => {
278                row_writer::write_fields(
279                    writer,
280                    std::iter::once(make_column_data(
281                        &key,
282                        ColumnDataType::Float64,
283                        Some(ValueData::F64Value(v)),
284                    )),
285                    row,
286                )?;
287            }
288            Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
289                writer,
290                key,
291                any_value_to_jsonb(OtlpValue::ArrayValue(v)),
292                row,
293            )?,
294            Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
295                writer,
296                key,
297                any_value_to_jsonb(OtlpValue::KvlistValue(v)),
298                row,
299            )?,
300            Some(OtlpValue::BytesValue(v)) => {
301                row_writer::write_fields(
302                    writer,
303                    std::iter::once(make_column_data(
304                        &key,
305                        ColumnDataType::Binary,
306                        Some(ValueData::BinaryValue(v)),
307                    )),
308                    row,
309                )?;
310            }
311            None => {}
312        }
313    }
314
315    Ok(())
316}