Skip to main content

servers/otlp/trace/
v0.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use pipeline::{GreptimePipelineParams, PipelineWay};
22use session::context::QueryContextRef;
23
24use crate::error::Result;
25use crate::otlp::trace::span::TraceSpan;
26use crate::otlp::trace::{
27    DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
28    SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
29    SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
30    TraceAuxData,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38// Use a timestamp(2100-01-01 00:00:00) as large as possible.
39const MAX_TIMESTAMP: i64 = 4102444800000000000;
40
41/// Converts trace spans into row insert requests for the main v0 trace table.
42///
43/// Auxiliary service and operation table writes are built separately so the
44/// caller can update them only after the main span write succeeds.
45pub fn v0_to_grpc_main_insert_requests(
46    spans: &[TraceSpan],
47    _pipeline: &PipelineWay,
48    _pipeline_params: &GreptimePipelineParams,
49    table_name: &str,
50    _query_ctx: &QueryContextRef,
51    _pipeline_handler: PipelineHandlerRef,
52) -> Result<(RowInsertRequests, usize)> {
53    let mut multi_table_writer = MultiTableData::default();
54    let trace_writer = build_trace_table_data(spans)?;
55    multi_table_writer.add_table_data(table_name, trace_writer);
56
57    Ok(multi_table_writer.into_row_insert_requests())
58}
59
60/// Builds the row-oriented payload for the main v0 trace table.
61pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result<TableData> {
62    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
63    for span in spans.iter().cloned() {
64        write_span_to_row(&mut trace_writer, span)?;
65    }
66
67    Ok(trace_writer)
68}
69
70/// Builds row insert requests for the v0 trace auxiliary tables.
71pub fn build_aux_table_requests(
72    aux_data: TraceAuxData,
73    table_name: &str,
74) -> Result<(RowInsertRequests, usize)> {
75    let mut multi_table_writer = MultiTableData::default();
76    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
77    let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
78
79    write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?;
80    write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?;
81
82    multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer);
83    multi_table_writer.add_table_data(
84        trace_operations_table_name(table_name),
85        trace_operations_writer,
86    );
87    Ok(multi_table_writer.into_row_insert_requests())
88}
89
90pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
91    let mut row = writer.alloc_one_row();
92
93    // write ts
94    row_writer::write_ts_to_nanos(
95        writer,
96        TIMESTAMP_COLUMN,
97        Some(span.start_in_nanosecond as i64),
98        Precision::Nanosecond,
99        &mut row,
100    )?;
101
102    // write fields
103    let fields = vec![
104        make_column_data(
105            "timestamp_end",
106            ColumnDataType::TimestampNanosecond,
107            Some(ValueData::TimestampNanosecondValue(
108                span.end_in_nanosecond as i64,
109            )),
110        ),
111        make_column_data(
112            DURATION_NANO_COLUMN,
113            ColumnDataType::Uint64,
114            Some(ValueData::U64Value(
115                span.end_in_nanosecond - span.start_in_nanosecond,
116            )),
117        ),
118        make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
119        make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
120        make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
121        make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
122        make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
123        make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
124        make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
125        make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
126    ];
127    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
128
129    if let Some(service_name) = span.service_name {
130        row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
131    }
132
133    row_writer::write_json(
134        writer,
135        SPAN_ATTRIBUTES_COLUMN,
136        span.span_attributes.into(),
137        &mut row,
138    )?;
139    row_writer::write_json(
140        writer,
141        SPAN_EVENTS_COLUMN,
142        span.span_events.into(),
143        &mut row,
144    )?;
145    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
146
147    // write fields
148    let fields = vec![
149        make_string_column_data("scope_name", Some(span.scope_name)),
150        make_string_column_data("scope_version", Some(span.scope_version)),
151    ];
152    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
153
154    row_writer::write_json(
155        writer,
156        "scope_attributes",
157        span.scope_attributes.into(),
158        &mut row,
159    )?;
160
161    row_writer::write_json(
162        writer,
163        "resource_attributes",
164        span.resource_attributes.into(),
165        &mut row,
166    )?;
167
168    writer.add_row(row);
169
170    Ok(())
171}
172
173fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
174    for service_name in services {
175        let mut row = writer.alloc_one_row();
176        // Write the timestamp as 0.
177        row_writer::write_ts_to_nanos(
178            writer,
179            TIMESTAMP_COLUMN,
180            Some(MAX_TIMESTAMP),
181            Precision::Nanosecond,
182            &mut row,
183        )?;
184
185        // Write the `service_name` column.
186        row_writer::write_fields(
187            writer,
188            std::iter::once(make_string_column_data(
189                SERVICE_NAME_COLUMN,
190                Some(service_name),
191            )),
192            &mut row,
193        )?;
194        writer.add_row(row);
195    }
196
197    Ok(())
198}
199
200fn write_trace_operations_to_row(
201    writer: &mut TableData,
202    operations: HashSet<(String, String, String)>,
203) -> Result<()> {
204    for (service_name, span_name, span_kind) in operations {
205        let mut row = writer.alloc_one_row();
206        // Write the timestamp as 0.
207        row_writer::write_ts_to_nanos(
208            writer,
209            TIMESTAMP_COLUMN,
210            Some(MAX_TIMESTAMP),
211            Precision::Nanosecond,
212            &mut row,
213        )?;
214
215        // Write the `service_name`, `span_name`, and `span_kind` columns.
216        row_writer::write_fields(
217            writer,
218            vec![
219                make_string_column_data(SERVICE_NAME_COLUMN, Some(service_name)),
220                make_string_column_data(SPAN_NAME_COLUMN, Some(span_name)),
221                make_string_column_data(SPAN_KIND_COLUMN, Some(span_kind)),
222            ]
223            .into_iter(),
224            &mut row,
225        )?;
226        writer.add_row(row);
227    }
228
229    Ok(())
230}
231
232#[cfg(test)]
233mod tests {
234    use super::{build_aux_table_requests, build_trace_table_data};
235    use crate::otlp::trace::TraceAuxData;
236    use crate::otlp::trace::attributes::Attributes;
237    use crate::otlp::trace::span::{SpanEvents, SpanLinks, TraceSpan};
238
239    fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan {
240        TraceSpan {
241            service_name: Some(service_name.to_string()),
242            trace_id: trace_id.to_string(),
243            span_id: span_id.to_string(),
244            parent_span_id: None,
245            resource_attributes: Attributes::from(vec![]),
246            scope_name: "scope".to_string(),
247            scope_version: "v1".to_string(),
248            scope_attributes: Attributes::from(vec![]),
249            trace_state: String::new(),
250            span_name: "op".to_string(),
251            span_kind: "SPAN_KIND_SERVER".to_string(),
252            span_status_code: "STATUS_CODE_UNSET".to_string(),
253            span_status_message: String::new(),
254            span_attributes: Attributes::from(vec![]),
255            span_events: SpanEvents::from(vec![]),
256            span_links: SpanLinks::from(vec![]),
257            start_in_nanosecond: 1,
258            end_in_nanosecond: 2,
259        }
260    }
261
262    #[test]
263    fn test_build_trace_table_data_from_span_subset() {
264        let spans = [
265            make_span("svc-a", "trace-a", "span-a"),
266            make_span("svc-b", "trace-b", "span-b"),
267        ];
268
269        let writer = build_trace_table_data(&spans[..1]).unwrap();
270        let (_, rows) = writer.into_schema_and_rows();
271        assert_eq!(rows.len(), 1);
272    }
273
274    #[test]
275    fn test_build_aux_table_requests_deduplicates_services_and_operations() {
276        let spans = vec![
277            make_span("svc-a", "trace-a", "span-a"),
278            make_span("svc-a", "trace-b", "span-b"),
279        ];
280        let mut aux_data = TraceAuxData::default();
281        for span in &spans {
282            aux_data.observe_span(span);
283        }
284
285        let (requests, total_rows) =
286            build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap();
287        assert_eq!(requests.inserts.len(), 2);
288        assert_eq!(total_rows, 2);
289    }
290}