servers/otlp/trace/
v0.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::span::{TraceSpan, parse};
27use crate::otlp::trace::{
28    DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
29    SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
30    TRACE_ID_COLUMN,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38// Use a timestamp(2100-01-01 00:00:00) as large as possible.
39const MAX_TIMESTAMP: i64 = 4102444800000000000;
40
41/// Convert SpanTraces to GreptimeDB row insert requests.
42/// Returns `InsertRequests` and total number of rows to ingest
43pub fn v0_to_grpc_insert_requests(
44    request: ExportTraceServiceRequest,
45    _pipeline: PipelineWay,
46    _pipeline_params: GreptimePipelineParams,
47    table_name: String,
48    _query_ctx: &QueryContextRef,
49    _pipeline_handler: PipelineHandlerRef,
50) -> Result<(RowInsertRequests, usize)> {
51    let spans = parse(request);
52    let mut multi_table_writer = MultiTableData::default();
53    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
54    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
55    let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
56
57    let mut services = HashSet::new();
58    let mut operations = HashSet::new();
59    for span in spans {
60        if let Some(service_name) = &span.service_name {
61            // Only insert the service name if it's not already in the set.
62            if !services.contains(service_name) {
63                services.insert(service_name.clone());
64            }
65
66            // Collect operations (service_name + span_name + span_kind).
67            let operation = (
68                service_name.clone(),
69                span.span_name.clone(),
70                span.span_kind.clone(),
71            );
72            if !operations.contains(&operation) {
73                operations.insert(operation);
74            }
75        }
76        write_span_to_row(&mut trace_writer, span)?;
77    }
78    write_trace_services_to_row(&mut trace_services_writer, services)?;
79    write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
80
81    multi_table_writer.add_table_data(
82        trace_services_table_name(&table_name),
83        trace_services_writer,
84    );
85    multi_table_writer.add_table_data(
86        trace_operations_table_name(&table_name),
87        trace_operations_writer,
88    );
89    multi_table_writer.add_table_data(table_name, trace_writer);
90
91    Ok(multi_table_writer.into_row_insert_requests())
92}
93
94pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
95    let mut row = writer.alloc_one_row();
96
97    // write ts
98    row_writer::write_ts_to_nanos(
99        writer,
100        TIMESTAMP_COLUMN,
101        Some(span.start_in_nanosecond as i64),
102        Precision::Nanosecond,
103        &mut row,
104    )?;
105
106    // write fields
107    let fields = vec![
108        make_column_data(
109            "timestamp_end",
110            ColumnDataType::TimestampNanosecond,
111            Some(ValueData::TimestampNanosecondValue(
112                span.end_in_nanosecond as i64,
113            )),
114        ),
115        make_column_data(
116            DURATION_NANO_COLUMN,
117            ColumnDataType::Uint64,
118            Some(ValueData::U64Value(
119                span.end_in_nanosecond - span.start_in_nanosecond,
120            )),
121        ),
122        make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
123        make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
124        make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
125        make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
126        make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
127        make_string_column_data("span_status_code", Some(span.span_status_code)),
128        make_string_column_data("span_status_message", Some(span.span_status_message)),
129        make_string_column_data("trace_state", Some(span.trace_state)),
130    ];
131    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
132
133    if let Some(service_name) = span.service_name {
134        row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
135    }
136
137    row_writer::write_json(
138        writer,
139        SPAN_ATTRIBUTES_COLUMN,
140        span.span_attributes.into(),
141        &mut row,
142    )?;
143    row_writer::write_json(
144        writer,
145        SPAN_EVENTS_COLUMN,
146        span.span_events.into(),
147        &mut row,
148    )?;
149    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
150
151    // write fields
152    let fields = vec![
153        make_string_column_data("scope_name", Some(span.scope_name)),
154        make_string_column_data("scope_version", Some(span.scope_version)),
155    ];
156    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
157
158    row_writer::write_json(
159        writer,
160        "scope_attributes",
161        span.scope_attributes.into(),
162        &mut row,
163    )?;
164
165    row_writer::write_json(
166        writer,
167        "resource_attributes",
168        span.resource_attributes.into(),
169        &mut row,
170    )?;
171
172    writer.add_row(row);
173
174    Ok(())
175}
176
177fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
178    for service_name in services {
179        let mut row = writer.alloc_one_row();
180        // Write the timestamp as 0.
181        row_writer::write_ts_to_nanos(
182            writer,
183            TIMESTAMP_COLUMN,
184            Some(MAX_TIMESTAMP),
185            Precision::Nanosecond,
186            &mut row,
187        )?;
188
189        // Write the `service_name` column.
190        row_writer::write_fields(
191            writer,
192            std::iter::once(make_string_column_data(
193                SERVICE_NAME_COLUMN,
194                Some(service_name),
195            )),
196            &mut row,
197        )?;
198        writer.add_row(row);
199    }
200
201    Ok(())
202}
203
204fn write_trace_operations_to_row(
205    writer: &mut TableData,
206    operations: HashSet<(String, String, String)>,
207) -> Result<()> {
208    for (service_name, span_name, span_kind) in operations {
209        let mut row = writer.alloc_one_row();
210        // Write the timestamp as 0.
211        row_writer::write_ts_to_nanos(
212            writer,
213            TIMESTAMP_COLUMN,
214            Some(MAX_TIMESTAMP),
215            Precision::Nanosecond,
216            &mut row,
217        )?;
218
219        // Write the `service_name`, `span_name`, and `span_kind` columns.
220        row_writer::write_fields(
221            writer,
222            vec![
223                make_string_column_data(SERVICE_NAME_COLUMN, Some(service_name)),
224                make_string_column_data(SPAN_NAME_COLUMN, Some(span_name)),
225                make_string_column_data(SPAN_KIND_COLUMN, Some(span_kind)),
226            ]
227            .into_iter(),
228            &mut row,
229        )?;
230        writer.add_row(row);
231    }
232
233    Ok(())
234}