1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{parse, TraceSpan};
29use crate::otlp::trace::{
30 DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
31 SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
32 TRACE_ID_COLUMN,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40pub fn v1_to_grpc_insert_requests(
52 request: ExportTraceServiceRequest,
53 _pipeline: PipelineWay,
54 _pipeline_params: GreptimePipelineParams,
55 table_name: String,
56 _query_ctx: &QueryContextRef,
57 _pipeline_handler: PipelineHandlerRef,
58) -> Result<(RowInsertRequests, usize)> {
59 let spans = parse(request);
60 let mut multi_table_writer = MultiTableData::default();
61 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
62 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
63
64 let mut services = HashSet::new();
65 for span in spans {
66 if let Some(service_name) = &span.service_name {
67 if !services.contains(service_name) {
69 services.insert(service_name.clone());
70 }
71 }
72 write_span_to_row(&mut trace_writer, span)?;
73 }
74 write_trace_services_to_row(&mut trace_services_writer, services)?;
75
76 multi_table_writer.add_table_data(
77 trace_services_table_name(&table_name),
78 trace_services_writer,
79 );
80 multi_table_writer.add_table_data(table_name, trace_writer);
81
82 Ok(multi_table_writer.into_row_insert_requests())
83}
84
85pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
86 let mut row = writer.alloc_one_row();
87
88 row_writer::write_ts_to_nanos(
90 writer,
91 TIMESTAMP_COLUMN,
92 Some(span.start_in_nanosecond as i64),
93 Precision::Nanosecond,
94 &mut row,
95 )?;
96
97 let fields = vec![
99 make_column_data(
100 "timestamp_end",
101 ColumnDataType::TimestampNanosecond,
102 Some(ValueData::TimestampNanosecondValue(
103 span.end_in_nanosecond as i64,
104 )),
105 ),
106 make_column_data(
107 DURATION_NANO_COLUMN,
108 ColumnDataType::Uint64,
109 Some(ValueData::U64Value(
110 span.end_in_nanosecond - span.start_in_nanosecond,
111 )),
112 ),
113 make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
114 make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
115 make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
116 make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
117 make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
118 make_string_column_data("span_status_code", Some(span.span_status_code)),
119 make_string_column_data("span_status_message", Some(span.span_status_message)),
120 make_string_column_data("trace_state", Some(span.trace_state)),
121 make_string_column_data("scope_name", Some(span.scope_name)),
122 make_string_column_data("scope_version", Some(span.scope_version)),
123 ];
124 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
125
126 if let Some(service_name) = span.service_name {
127 row_writer::write_tags(
128 writer,
129 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
130 &mut row,
131 )?;
132 }
133
134 write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
135 write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
136 write_attributes(
137 writer,
138 "resource_attributes",
139 span.resource_attributes,
140 &mut row,
141 )?;
142
143 row_writer::write_json(
144 writer,
145 SPAN_EVENTS_COLUMN,
146 span.span_events.into(),
147 &mut row,
148 )?;
149 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
150
151 writer.add_row(row);
152
153 Ok(())
154}
155
156fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
157 for service_name in services {
158 let mut row = writer.alloc_one_row();
159 row_writer::write_ts_to_nanos(
161 writer,
162 TIMESTAMP_COLUMN,
163 Some(4102444800000000000), Precision::Nanosecond,
165 &mut row,
166 )?;
167
168 row_writer::write_tags(
170 writer,
171 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
172 &mut row,
173 )?;
174 writer.add_row(row);
175 }
176
177 Ok(())
178}
179
180fn write_attributes(
181 writer: &mut TableData,
182 prefix: &str,
183 attributes: Attributes,
184 row: &mut Vec<Value>,
185) -> Result<()> {
186 for attr in attributes.take().into_iter() {
187 let key_suffix = attr.key;
188 if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
191 continue;
192 }
193
194 let key = format!("{}.{}", prefix, key_suffix);
195 match attr.value.and_then(|v| v.value) {
196 Some(OtlpValue::StringValue(v)) => {
197 row_writer::write_fields(
198 writer,
199 std::iter::once(make_string_column_data(&key, Some(v))),
200 row,
201 )?;
202 }
203 Some(OtlpValue::BoolValue(v)) => {
204 row_writer::write_fields(
205 writer,
206 std::iter::once(make_column_data(
207 &key,
208 ColumnDataType::Boolean,
209 Some(ValueData::BoolValue(v)),
210 )),
211 row,
212 )?;
213 }
214 Some(OtlpValue::IntValue(v)) => {
215 row_writer::write_fields(
216 writer,
217 std::iter::once(make_column_data(
218 &key,
219 ColumnDataType::Int64,
220 Some(ValueData::I64Value(v)),
221 )),
222 row,
223 )?;
224 }
225 Some(OtlpValue::DoubleValue(v)) => {
226 row_writer::write_fields(
227 writer,
228 std::iter::once(make_column_data(
229 &key,
230 ColumnDataType::Float64,
231 Some(ValueData::F64Value(v)),
232 )),
233 row,
234 )?;
235 }
236 Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
237 writer,
238 key,
239 any_value_to_jsonb(OtlpValue::ArrayValue(v)),
240 row,
241 )?,
242 Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
243 writer,
244 key,
245 any_value_to_jsonb(OtlpValue::KvlistValue(v)),
246 row,
247 )?,
248 Some(OtlpValue::BytesValue(v)) => {
249 row_writer::write_fields(
250 writer,
251 std::iter::once(make_column_data(
252 &key,
253 ColumnDataType::Binary,
254 Some(ValueData::BinaryValue(v)),
255 )),
256 row,
257 )?;
258 }
259 None => {}
260 }
261 }
262
263 Ok(())
264}