servers/otlp/trace/
v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{TraceSpan, parse};
29use crate::otlp::trace::{
30    DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
31    SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
32    SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
33    TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
34};
35use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
36use crate::query_handler::PipelineHandlerRef;
37use crate::row_writer::{self, MultiTableData, TableData};
38
39const APPROXIMATE_COLUMN_COUNT: usize = 30;
40
41// Use a timestamp(2100-01-01 00:00:00) as large as possible.
42const MAX_TIMESTAMP: i64 = 4102444800000000000;
43
44/// Convert SpanTraces to GreptimeDB row insert requests.
45/// Returns `InsertRequests` and total number of rows to ingest
46///
47/// Compared with v0, this v1 implementation:
48/// 1. flattens all attribute data into columns.
49/// 2. treat `span_id` and `parent_trace_id` as fields.
50/// 3. removed `service_name` column because it's already in
51///    `resource_attributes.service_name`
52///
53/// For other compound data structures like span_links and span_events here we
54/// are still using `json` data structure.
55pub fn v1_to_grpc_insert_requests(
56    request: ExportTraceServiceRequest,
57    _pipeline: PipelineWay,
58    _pipeline_params: GreptimePipelineParams,
59    table_name: String,
60    _query_ctx: &QueryContextRef,
61    _pipeline_handler: PipelineHandlerRef,
62) -> Result<(RowInsertRequests, usize)> {
63    let spans = parse(request);
64    let mut multi_table_writer = MultiTableData::default();
65    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
66    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
67    let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
68
69    let mut services = HashSet::new();
70    let mut operations = HashSet::new();
71    for span in spans {
72        if let Some(service_name) = &span.service_name {
73            // Only insert the service name if it's not already in the set.
74            if !services.contains(service_name) {
75                services.insert(service_name.clone());
76            }
77
78            // Only insert the operation if it's not already in the set.
79            let operation = (
80                service_name.clone(),
81                span.span_name.clone(),
82                span.span_kind.clone(),
83            );
84            if !operations.contains(&operation) {
85                operations.insert(operation);
86            }
87        }
88        write_span_to_row(&mut trace_writer, span)?;
89    }
90    write_trace_services_to_row(&mut trace_services_writer, services)?;
91    write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
92
93    multi_table_writer.add_table_data(
94        trace_services_table_name(&table_name),
95        trace_services_writer,
96    );
97    multi_table_writer.add_table_data(
98        trace_operations_table_name(&table_name),
99        trace_operations_writer,
100    );
101    multi_table_writer.add_table_data(table_name, trace_writer);
102
103    Ok(multi_table_writer.into_row_insert_requests())
104}
105
106pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
107    let mut row = writer.alloc_one_row();
108
109    // write ts
110    row_writer::write_ts_to_nanos(
111        writer,
112        TIMESTAMP_COLUMN,
113        Some(span.start_in_nanosecond as i64),
114        Precision::Nanosecond,
115        &mut row,
116    )?;
117
118    // write fields
119    let fields = vec![
120        make_column_data(
121            "timestamp_end",
122            ColumnDataType::TimestampNanosecond,
123            Some(ValueData::TimestampNanosecondValue(
124                span.end_in_nanosecond as i64,
125            )),
126        ),
127        make_column_data(
128            DURATION_NANO_COLUMN,
129            ColumnDataType::Uint64,
130            Some(ValueData::U64Value(
131                span.end_in_nanosecond - span.start_in_nanosecond,
132            )),
133        ),
134        make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
135        make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
136        make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
137        make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
138        make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
139        make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
140        make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
141        make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
142        make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)),
143        make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)),
144    ];
145    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
146
147    if let Some(service_name) = span.service_name {
148        row_writer::write_tags(
149            writer,
150            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
151            &mut row,
152        )?;
153    }
154
155    write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
156    write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
157    write_attributes(
158        writer,
159        "resource_attributes",
160        span.resource_attributes,
161        &mut row,
162    )?;
163
164    row_writer::write_json(
165        writer,
166        SPAN_EVENTS_COLUMN,
167        span.span_events.into(),
168        &mut row,
169    )?;
170    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
171
172    writer.add_row(row);
173
174    Ok(())
175}
176
177fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
178    for service_name in services {
179        let mut row = writer.alloc_one_row();
180        // Write the timestamp as 0.
181        row_writer::write_ts_to_nanos(
182            writer,
183            TIMESTAMP_COLUMN,
184            Some(MAX_TIMESTAMP),
185            Precision::Nanosecond,
186            &mut row,
187        )?;
188
189        // Write the `service_name` column.
190        row_writer::write_tags(
191            writer,
192            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
193            &mut row,
194        )?;
195        writer.add_row(row);
196    }
197
198    Ok(())
199}
200
201fn write_trace_operations_to_row(
202    writer: &mut TableData,
203    operations: HashSet<(String, String, String)>,
204) -> Result<()> {
205    for (service_name, span_name, span_kind) in operations {
206        let mut row = writer.alloc_one_row();
207        // Write the timestamp as 0.
208        row_writer::write_ts_to_nanos(
209            writer,
210            TIMESTAMP_COLUMN,
211            Some(MAX_TIMESTAMP),
212            Precision::Nanosecond,
213            &mut row,
214        )?;
215
216        // Write the `service_name`, `span_name`, and `span_kind` columns as tags.
217        row_writer::write_tags(
218            writer,
219            vec![
220                (SERVICE_NAME_COLUMN.to_string(), service_name),
221                (SPAN_NAME_COLUMN.to_string(), span_name),
222                (SPAN_KIND_COLUMN.to_string(), span_kind),
223            ]
224            .into_iter(),
225            &mut row,
226        )?;
227        writer.add_row(row);
228    }
229
230    Ok(())
231}
232
233fn write_attributes(
234    writer: &mut TableData,
235    prefix: &str,
236    attributes: Attributes,
237    row: &mut Vec<Value>,
238) -> Result<()> {
239    for attr in attributes.take().into_iter() {
240        let key_suffix = attr.key;
241        // skip resource_attributes.service.name because its already copied to
242        // top level as `SERVICE_NAME_COLUMN`
243        if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
244            continue;
245        }
246
247        let key = format!("{}.{}", prefix, key_suffix);
248        match attr.value.and_then(|v| v.value) {
249            Some(OtlpValue::StringValue(v)) => {
250                row_writer::write_fields(
251                    writer,
252                    std::iter::once(make_string_column_data(&key, Some(v))),
253                    row,
254                )?;
255            }
256            Some(OtlpValue::BoolValue(v)) => {
257                row_writer::write_fields(
258                    writer,
259                    std::iter::once(make_column_data(
260                        &key,
261                        ColumnDataType::Boolean,
262                        Some(ValueData::BoolValue(v)),
263                    )),
264                    row,
265                )?;
266            }
267            Some(OtlpValue::IntValue(v)) => {
268                row_writer::write_fields(
269                    writer,
270                    std::iter::once(make_column_data(
271                        &key,
272                        ColumnDataType::Int64,
273                        Some(ValueData::I64Value(v)),
274                    )),
275                    row,
276                )?;
277            }
278            Some(OtlpValue::DoubleValue(v)) => {
279                row_writer::write_fields(
280                    writer,
281                    std::iter::once(make_column_data(
282                        &key,
283                        ColumnDataType::Float64,
284                        Some(ValueData::F64Value(v)),
285                    )),
286                    row,
287                )?;
288            }
289            Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
290                writer,
291                key,
292                any_value_to_jsonb(OtlpValue::ArrayValue(v)),
293                row,
294            )?,
295            Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
296                writer,
297                key,
298                any_value_to_jsonb(OtlpValue::KvlistValue(v)),
299                row,
300            )?,
301            Some(OtlpValue::BytesValue(v)) => {
302                row_writer::write_fields(
303                    writer,
304                    std::iter::once(make_column_data(
305                        &key,
306                        ColumnDataType::Binary,
307                        Some(ValueData::BinaryValue(v)),
308                    )),
309                    row,
310                )?;
311            }
312            None => {}
313        }
314    }
315
316    Ok(())
317}