servers/otlp/trace/
v0.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::span::{parse, TraceSpan};
27use crate::otlp::trace::{
28    DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
29    SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
30    TRACE_ID_COLUMN,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38/// Convert SpanTraces to GreptimeDB row insert requests.
39/// Returns `InsertRequests` and total number of rows to ingest
40pub fn v0_to_grpc_insert_requests(
41    request: ExportTraceServiceRequest,
42    _pipeline: PipelineWay,
43    _pipeline_params: GreptimePipelineParams,
44    table_name: String,
45    _query_ctx: &QueryContextRef,
46    _pipeline_handler: PipelineHandlerRef,
47) -> Result<(RowInsertRequests, usize)> {
48    let spans = parse(request);
49    let mut multi_table_writer = MultiTableData::default();
50    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
51    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
52
53    let mut services = HashSet::new();
54    for span in spans {
55        if let Some(service_name) = &span.service_name {
56            // Only insert the service name if it's not already in the set.
57            if !services.contains(service_name) {
58                services.insert(service_name.clone());
59            }
60        }
61        write_span_to_row(&mut trace_writer, span)?;
62    }
63    write_trace_services_to_row(&mut trace_services_writer, services)?;
64
65    multi_table_writer.add_table_data(
66        trace_services_table_name(&table_name),
67        trace_services_writer,
68    );
69    multi_table_writer.add_table_data(table_name, trace_writer);
70
71    Ok(multi_table_writer.into_row_insert_requests())
72}
73
74pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
75    let mut row = writer.alloc_one_row();
76
77    // write ts
78    row_writer::write_ts_to_nanos(
79        writer,
80        TIMESTAMP_COLUMN,
81        Some(span.start_in_nanosecond as i64),
82        Precision::Nanosecond,
83        &mut row,
84    )?;
85
86    // write fields
87    let fields = vec![
88        make_column_data(
89            "timestamp_end",
90            ColumnDataType::TimestampNanosecond,
91            Some(ValueData::TimestampNanosecondValue(
92                span.end_in_nanosecond as i64,
93            )),
94        ),
95        make_column_data(
96            DURATION_NANO_COLUMN,
97            ColumnDataType::Uint64,
98            Some(ValueData::U64Value(
99                span.end_in_nanosecond - span.start_in_nanosecond,
100            )),
101        ),
102        make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
103        make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
104        make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
105        make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
106        make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
107        make_string_column_data("span_status_code", Some(span.span_status_code)),
108        make_string_column_data("span_status_message", Some(span.span_status_message)),
109        make_string_column_data("trace_state", Some(span.trace_state)),
110    ];
111    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
112
113    if let Some(service_name) = span.service_name {
114        row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
115    }
116
117    row_writer::write_json(
118        writer,
119        SPAN_ATTRIBUTES_COLUMN,
120        span.span_attributes.into(),
121        &mut row,
122    )?;
123    row_writer::write_json(
124        writer,
125        SPAN_EVENTS_COLUMN,
126        span.span_events.into(),
127        &mut row,
128    )?;
129    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
130
131    // write fields
132    let fields = vec![
133        make_string_column_data("scope_name", Some(span.scope_name)),
134        make_string_column_data("scope_version", Some(span.scope_version)),
135    ];
136    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
137
138    row_writer::write_json(
139        writer,
140        "scope_attributes",
141        span.scope_attributes.into(),
142        &mut row,
143    )?;
144
145    row_writer::write_json(
146        writer,
147        "resource_attributes",
148        span.resource_attributes.into(),
149        &mut row,
150    )?;
151
152    writer.add_row(row);
153
154    Ok(())
155}
156
157fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
158    for service_name in services {
159        let mut row = writer.alloc_one_row();
160        // Write the timestamp as 0.
161        row_writer::write_ts_to_nanos(
162            writer,
163            TIMESTAMP_COLUMN,
164            Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
165            Precision::Nanosecond,
166            &mut row,
167        )?;
168
169        // Write the `service_name` column.
170        row_writer::write_fields(
171            writer,
172            std::iter::once(make_string_column_data(
173                SERVICE_NAME_COLUMN,
174                Some(service_name),
175            )),
176            &mut row,
177        )?;
178        writer.add_row(row);
179    }
180
181    Ok(())
182}