Skip to main content

servers/otlp/trace/
v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::attributes::Attributes;
27use crate::otlp::trace::span::TraceSpan;
28use crate::otlp::trace::{
29    DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
30    SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
31    SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
32    TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, TraceAuxData,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40// Use a timestamp(2100-01-01 00:00:00) as large as possible.
41const MAX_TIMESTAMP: i64 = 4102444800000000000;
42
43/// Converts trace spans into row insert requests for the main v1 trace table.
44///
45/// Auxiliary service and operation table writes are built separately so the
46/// caller can update them only after the main span write succeeds.
47pub fn v1_to_grpc_main_insert_requests(
48    spans: &[TraceSpan],
49    _pipeline: &PipelineWay,
50    _pipeline_params: &GreptimePipelineParams,
51    table_name: &str,
52    _query_ctx: &QueryContextRef,
53    _pipeline_handler: PipelineHandlerRef,
54) -> Result<(RowInsertRequests, usize)> {
55    let mut multi_table_writer = MultiTableData::default();
56    let trace_writer = build_trace_table_data(spans)?;
57    multi_table_writer.add_table_data(table_name, trace_writer);
58
59    Ok(multi_table_writer.into_row_insert_requests())
60}
61
62/// Builds the row-oriented payload for the main v1 trace table.
63pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result<TableData> {
64    let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
65    for span in spans.iter().cloned() {
66        write_span_to_row(&mut trace_writer, span)?;
67    }
68
69    Ok(trace_writer)
70}
71
72/// Builds row insert requests for the v1 trace auxiliary tables.
73pub fn build_aux_table_requests(
74    aux_data: TraceAuxData,
75    table_name: &str,
76) -> Result<(RowInsertRequests, usize)> {
77    let mut multi_table_writer = MultiTableData::default();
78    let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
79    let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
80
81    write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?;
82    write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?;
83
84    multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer);
85    multi_table_writer.add_table_data(
86        trace_operations_table_name(table_name),
87        trace_operations_writer,
88    );
89
90    Ok(multi_table_writer.into_row_insert_requests())
91}
92
93pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
94    let mut row = writer.alloc_one_row();
95
96    // write ts
97    row_writer::write_ts_to_nanos(
98        writer,
99        TIMESTAMP_COLUMN,
100        Some(span.start_in_nanosecond as i64),
101        Precision::Nanosecond,
102        &mut row,
103    )?;
104
105    // write fields
106    let fields = vec![
107        make_column_data(
108            "timestamp_end",
109            ColumnDataType::TimestampNanosecond,
110            Some(ValueData::TimestampNanosecondValue(
111                span.end_in_nanosecond as i64,
112            )),
113        ),
114        make_column_data(
115            DURATION_NANO_COLUMN,
116            ColumnDataType::Uint64,
117            Some(ValueData::U64Value(
118                span.end_in_nanosecond - span.start_in_nanosecond,
119            )),
120        ),
121        make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
122        make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
123        make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
124        make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
125        make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
126        make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
127        make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
128        make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
129        make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)),
130        make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)),
131    ];
132    row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
133
134    if let Some(service_name) = span.service_name {
135        row_writer::write_tags(
136            writer,
137            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
138            &mut row,
139        )?;
140    }
141
142    write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
143    write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
144    write_attributes(
145        writer,
146        "resource_attributes",
147        span.resource_attributes,
148        &mut row,
149    )?;
150
151    row_writer::write_json(
152        writer,
153        SPAN_EVENTS_COLUMN,
154        span.span_events.into(),
155        &mut row,
156    )?;
157    row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
158
159    writer.add_row(row);
160
161    Ok(())
162}
163
164fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
165    for service_name in services {
166        let mut row = writer.alloc_one_row();
167        // Write the timestamp as 0.
168        row_writer::write_ts_to_nanos(
169            writer,
170            TIMESTAMP_COLUMN,
171            Some(MAX_TIMESTAMP),
172            Precision::Nanosecond,
173            &mut row,
174        )?;
175
176        // Write the `service_name` column.
177        row_writer::write_tags(
178            writer,
179            std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
180            &mut row,
181        )?;
182        writer.add_row(row);
183    }
184
185    Ok(())
186}
187
188fn write_trace_operations_to_row(
189    writer: &mut TableData,
190    operations: HashSet<(String, String, String)>,
191) -> Result<()> {
192    for (service_name, span_name, span_kind) in operations {
193        let mut row = writer.alloc_one_row();
194        // Write the timestamp as 0.
195        row_writer::write_ts_to_nanos(
196            writer,
197            TIMESTAMP_COLUMN,
198            Some(MAX_TIMESTAMP),
199            Precision::Nanosecond,
200            &mut row,
201        )?;
202
203        // Write the `service_name`, `span_name`, and `span_kind` columns as tags.
204        row_writer::write_tags(
205            writer,
206            vec![
207                (SERVICE_NAME_COLUMN.to_string(), service_name),
208                (SPAN_NAME_COLUMN.to_string(), span_name),
209                (SPAN_KIND_COLUMN.to_string(), span_kind),
210            ]
211            .into_iter(),
212            &mut row,
213        )?;
214        writer.add_row(row);
215    }
216
217    Ok(())
218}
219
220pub(crate) fn write_attributes(
221    writer: &mut TableData,
222    prefix: &str,
223    attributes: Attributes,
224    row: &mut Vec<Value>,
225) -> Result<()> {
226    for attr in attributes.take().into_iter() {
227        let key_suffix = attr.key;
228        // skip resource_attributes.service.name because its already copied to
229        // top level as `SERVICE_NAME_COLUMN`
230        if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
231            continue;
232        }
233
234        let key = format!("{}.{}", prefix, key_suffix);
235        match attr.value.and_then(|v| v.value) {
236            Some(OtlpValue::StringValue(v)) => {
237                // Keep the raw request value here. Mixed trace types are reconciled later
238                // in the frontend once we can also see the existing table schema.
239                writer.write_field_unchecked(
240                    &key,
241                    ColumnDataType::String,
242                    Some(ValueData::StringValue(v)),
243                    row,
244                );
245            }
246            Some(OtlpValue::BoolValue(v)) => {
247                // Do not coerce or promote types while building the request-local rows.
248                writer.write_field_unchecked(
249                    &key,
250                    ColumnDataType::Boolean,
251                    Some(ValueData::BoolValue(v)),
252                    row,
253                );
254            }
255            Some(OtlpValue::IntValue(v)) => {
256                // Preserving the original value avoids order-dependent behavior inside one batch.
257                writer.write_field_unchecked(
258                    &key,
259                    ColumnDataType::Int64,
260                    Some(ValueData::I64Value(v)),
261                    row,
262                );
263            }
264            Some(OtlpValue::DoubleValue(v)) => {
265                writer.write_field_unchecked(
266                    &key,
267                    ColumnDataType::Float64,
268                    Some(ValueData::F64Value(v)),
269                    row,
270                );
271            }
272            Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
273                writer,
274                key,
275                any_value_to_jsonb(OtlpValue::ArrayValue(v)),
276                row,
277            )?,
278            Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
279                writer,
280                key,
281                any_value_to_jsonb(OtlpValue::KvlistValue(v)),
282                row,
283            )?,
284            Some(OtlpValue::BytesValue(v)) => {
285                row_writer::write_fields(
286                    writer,
287                    std::iter::once(make_column_data(
288                        &key,
289                        ColumnDataType::Binary,
290                        Some(ValueData::BinaryValue(v)),
291                    )),
292                    row,
293                )?;
294            }
295            None => {}
296        }
297    }
298
299    Ok(())
300}
301
302#[cfg(test)]
303mod tests {
304    use api::v1::value::ValueData;
305    use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
306    use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
307
308    use super::*;
309    use crate::otlp::trace::TraceAuxData;
310    use crate::otlp::trace::attributes::Attributes;
311    use crate::otlp::trace::span::{SpanEvents, SpanLinks};
312    use crate::row_writer::TableData;
313
314    fn make_kv(key: &str, value: OtlpValue) -> KeyValue {
315        KeyValue {
316            key: key.to_string(),
317            value: Some(AnyValue { value: Some(value) }),
318        }
319    }
320
321    fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan {
322        TraceSpan {
323            service_name: Some(service_name.to_string()),
324            trace_id: trace_id.to_string(),
325            span_id: span_id.to_string(),
326            parent_span_id: None,
327            resource_attributes: Attributes::from(vec![]),
328            scope_name: "scope".to_string(),
329            scope_version: "v1".to_string(),
330            scope_attributes: Attributes::from(vec![]),
331            trace_state: String::new(),
332            span_name: "op".to_string(),
333            span_kind: "SPAN_KIND_SERVER".to_string(),
334            span_status_code: "STATUS_CODE_UNSET".to_string(),
335            span_status_message: String::new(),
336            span_attributes: Attributes::from(vec![]),
337            span_events: SpanEvents::from(vec![]),
338            span_links: SpanLinks::from(vec![]),
339            start_in_nanosecond: 1,
340            end_in_nanosecond: 2,
341        }
342    }
343
344    #[test]
345    fn test_keep_mixed_numeric_values_until_frontend_reconciliation() {
346        let mut writer = TableData::new(4, 2);
347
348        let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]);
349        let mut row1 = writer.alloc_one_row();
350        write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
351        writer.add_row(row1);
352
353        let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(42))]);
354        let mut row2 = writer.alloc_one_row();
355        write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
356        writer.add_row(row2);
357
358        let (schema, rows) = writer.into_schema_and_rows();
359
360        let col_idx = schema
361            .iter()
362            .position(|c| c.column_name == "attr.val")
363            .unwrap();
364        assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32);
365
366        assert_eq!(
367            rows[0].values[col_idx].value_data,
368            Some(ValueData::F64Value(1.5))
369        );
370        assert_eq!(
371            rows[1].values[col_idx].value_data,
372            Some(ValueData::I64Value(42))
373        );
374    }
375
376    #[test]
377    fn test_keep_mixed_string_and_int_values_until_frontend_reconciliation() {
378        let mut writer = TableData::new(4, 2);
379
380        let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(10))]);
381        let mut row1 = writer.alloc_one_row();
382        write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
383        writer.add_row(row1);
384
385        let attrs2 = Attributes::from(vec![make_kv(
386            "val",
387            OtlpValue::StringValue("20".to_string()),
388        )]);
389        let mut row2 = writer.alloc_one_row();
390        write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
391        writer.add_row(row2);
392
393        let (schema, rows) = writer.into_schema_and_rows();
394        let col_idx = schema
395            .iter()
396            .position(|c| c.column_name == "attr.val")
397            .unwrap();
398        assert_eq!(schema[col_idx].datatype, ColumnDataType::Int64 as i32);
399        assert_eq!(
400            rows[1].values[col_idx].value_data,
401            Some(ValueData::StringValue("20".to_string()))
402        );
403    }
404
405    #[test]
406    fn test_keep_first_seen_schema_until_frontend_reconciliation() {
407        let mut writer = TableData::new(4, 2);
408
409        let attrs1 = Attributes::from(vec![make_kv(
410            "val",
411            OtlpValue::StringValue("10".to_string()),
412        )]);
413        let mut row1 = writer.alloc_one_row();
414        write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
415        writer.add_row(row1);
416
417        let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(20))]);
418        let mut row2 = writer.alloc_one_row();
419        write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
420        writer.add_row(row2);
421
422        let (schema, rows) = writer.into_schema_and_rows();
423        let col_idx = schema
424            .iter()
425            .position(|c| c.column_name == "attr.val")
426            .unwrap();
427        assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32);
428        assert_eq!(
429            rows[0].values[col_idx].value_data,
430            Some(ValueData::StringValue("10".to_string()))
431        );
432        assert_eq!(
433            rows[1].values[col_idx].value_data,
434            Some(ValueData::I64Value(20))
435        );
436    }
437
438    #[test]
439    fn test_keep_mixed_string_and_float_values_until_frontend_reconciliation() {
440        let mut writer = TableData::new(4, 2);
441
442        let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]);
443        let mut row1 = writer.alloc_one_row();
444        write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
445        writer.add_row(row1);
446
447        let attrs2 = Attributes::from(vec![make_kv(
448            "val",
449            OtlpValue::StringValue("1.5".to_string()),
450        )]);
451        let mut row2 = writer.alloc_one_row();
452        write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
453        writer.add_row(row2);
454
455        let (schema, rows) = writer.into_schema_and_rows();
456        let col_idx = schema
457            .iter()
458            .position(|c| c.column_name == "attr.val")
459            .unwrap();
460        assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32);
461        assert_eq!(
462            rows[1].values[col_idx].value_data,
463            Some(ValueData::StringValue("1.5".to_string()))
464        );
465    }
466
467    #[test]
468    fn test_keep_mixed_string_and_bool_values_until_frontend_reconciliation() {
469        let mut writer = TableData::new(4, 2);
470
471        let attrs1 = Attributes::from(vec![make_kv(
472            "val",
473            OtlpValue::StringValue("true".to_string()),
474        )]);
475        let mut row1 = writer.alloc_one_row();
476        write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
477        writer.add_row(row1);
478
479        let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::BoolValue(false))]);
480        let mut row2 = writer.alloc_one_row();
481        write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
482        writer.add_row(row2);
483
484        let (schema, rows) = writer.into_schema_and_rows();
485        let col_idx = schema
486            .iter()
487            .position(|c| c.column_name == "attr.val")
488            .unwrap();
489        assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32);
490        assert_eq!(
491            rows[0].values[col_idx].value_data,
492            Some(ValueData::StringValue("true".to_string()))
493        );
494        assert_eq!(
495            rows[1].values[col_idx].value_data,
496            Some(ValueData::BoolValue(false))
497        );
498    }
499
500    #[test]
501    fn test_keep_mixed_binary_and_string_values_until_frontend_reconciliation() {
502        let mut writer = TableData::new(4, 2);
503
504        let attrs1 = Attributes::from(vec![make_kv(
505            "val",
506            OtlpValue::BytesValue(vec![1_u8, 2, 3]),
507        )]);
508        let mut row1 = writer.alloc_one_row();
509        write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
510        writer.add_row(row1);
511
512        let attrs2 = Attributes::from(vec![make_kv(
513            "val",
514            OtlpValue::StringValue("false".to_string()),
515        )]);
516        let mut row2 = writer.alloc_one_row();
517        write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
518        writer.add_row(row2);
519
520        let (schema, rows) = writer.into_schema_and_rows();
521        let col_idx = schema
522            .iter()
523            .position(|c| c.column_name == "attr.val")
524            .unwrap();
525        assert_eq!(schema[col_idx].datatype, ColumnDataType::Binary as i32);
526        assert_eq!(
527            rows[0].values[col_idx].value_data,
528            Some(ValueData::BinaryValue(vec![1_u8, 2, 3]))
529        );
530        assert_eq!(
531            rows[1].values[col_idx].value_data,
532            Some(ValueData::StringValue("false".to_string()))
533        );
534    }
535
536    #[test]
537    fn test_build_aux_table_requests_deduplicates_services_and_operations() {
538        let spans = vec![
539            make_span("svc-a", "trace-a", "span-a"),
540            make_span("svc-a", "trace-b", "span-b"),
541        ];
542        let mut aux_data = TraceAuxData::default();
543        for span in &spans {
544            aux_data.observe_span(span);
545        }
546
547        let (requests, total_rows) =
548            build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap();
549        assert_eq!(requests.inserts.len(), 2);
550        assert_eq!(total_rows, 2);
551    }
552    // Conversion matrix coverage lives in the shared coercion helper tests.
553}