1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{TraceSpan, parse};
29use crate::otlp::trace::{
30 DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
31 SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
32 TRACE_ID_COLUMN,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40const MAX_TIMESTAMP: i64 = 4102444800000000000;
42
43pub fn v1_to_grpc_insert_requests(
55 request: ExportTraceServiceRequest,
56 _pipeline: PipelineWay,
57 _pipeline_params: GreptimePipelineParams,
58 table_name: String,
59 _query_ctx: &QueryContextRef,
60 _pipeline_handler: PipelineHandlerRef,
61) -> Result<(RowInsertRequests, usize)> {
62 let spans = parse(request);
63 let mut multi_table_writer = MultiTableData::default();
64 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
65 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
66 let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
67
68 let mut services = HashSet::new();
69 let mut operations = HashSet::new();
70 for span in spans {
71 if let Some(service_name) = &span.service_name {
72 if !services.contains(service_name) {
74 services.insert(service_name.clone());
75 }
76
77 let operation = (
79 service_name.clone(),
80 span.span_name.clone(),
81 span.span_kind.clone(),
82 );
83 if !operations.contains(&operation) {
84 operations.insert(operation);
85 }
86 }
87 write_span_to_row(&mut trace_writer, span)?;
88 }
89 write_trace_services_to_row(&mut trace_services_writer, services)?;
90 write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
91
92 multi_table_writer.add_table_data(
93 trace_services_table_name(&table_name),
94 trace_services_writer,
95 );
96 multi_table_writer.add_table_data(
97 trace_operations_table_name(&table_name),
98 trace_operations_writer,
99 );
100 multi_table_writer.add_table_data(table_name, trace_writer);
101
102 Ok(multi_table_writer.into_row_insert_requests())
103}
104
105pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
106 let mut row = writer.alloc_one_row();
107
108 row_writer::write_ts_to_nanos(
110 writer,
111 TIMESTAMP_COLUMN,
112 Some(span.start_in_nanosecond as i64),
113 Precision::Nanosecond,
114 &mut row,
115 )?;
116
117 let fields = vec![
119 make_column_data(
120 "timestamp_end",
121 ColumnDataType::TimestampNanosecond,
122 Some(ValueData::TimestampNanosecondValue(
123 span.end_in_nanosecond as i64,
124 )),
125 ),
126 make_column_data(
127 DURATION_NANO_COLUMN,
128 ColumnDataType::Uint64,
129 Some(ValueData::U64Value(
130 span.end_in_nanosecond - span.start_in_nanosecond,
131 )),
132 ),
133 make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
134 make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
135 make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
136 make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
137 make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
138 make_string_column_data("span_status_code", Some(span.span_status_code)),
139 make_string_column_data("span_status_message", Some(span.span_status_message)),
140 make_string_column_data("trace_state", Some(span.trace_state)),
141 make_string_column_data("scope_name", Some(span.scope_name)),
142 make_string_column_data("scope_version", Some(span.scope_version)),
143 ];
144 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
145
146 if let Some(service_name) = span.service_name {
147 row_writer::write_tags(
148 writer,
149 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
150 &mut row,
151 )?;
152 }
153
154 write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
155 write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
156 write_attributes(
157 writer,
158 "resource_attributes",
159 span.resource_attributes,
160 &mut row,
161 )?;
162
163 row_writer::write_json(
164 writer,
165 SPAN_EVENTS_COLUMN,
166 span.span_events.into(),
167 &mut row,
168 )?;
169 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
170
171 writer.add_row(row);
172
173 Ok(())
174}
175
176fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
177 for service_name in services {
178 let mut row = writer.alloc_one_row();
179 row_writer::write_ts_to_nanos(
181 writer,
182 TIMESTAMP_COLUMN,
183 Some(MAX_TIMESTAMP),
184 Precision::Nanosecond,
185 &mut row,
186 )?;
187
188 row_writer::write_tags(
190 writer,
191 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
192 &mut row,
193 )?;
194 writer.add_row(row);
195 }
196
197 Ok(())
198}
199
200fn write_trace_operations_to_row(
201 writer: &mut TableData,
202 operations: HashSet<(String, String, String)>,
203) -> Result<()> {
204 for (service_name, span_name, span_kind) in operations {
205 let mut row = writer.alloc_one_row();
206 row_writer::write_ts_to_nanos(
208 writer,
209 TIMESTAMP_COLUMN,
210 Some(MAX_TIMESTAMP),
211 Precision::Nanosecond,
212 &mut row,
213 )?;
214
215 row_writer::write_tags(
217 writer,
218 vec![
219 (SERVICE_NAME_COLUMN.to_string(), service_name),
220 (SPAN_NAME_COLUMN.to_string(), span_name),
221 (SPAN_KIND_COLUMN.to_string(), span_kind),
222 ]
223 .into_iter(),
224 &mut row,
225 )?;
226 writer.add_row(row);
227 }
228
229 Ok(())
230}
231
232fn write_attributes(
233 writer: &mut TableData,
234 prefix: &str,
235 attributes: Attributes,
236 row: &mut Vec<Value>,
237) -> Result<()> {
238 for attr in attributes.take().into_iter() {
239 let key_suffix = attr.key;
240 if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
243 continue;
244 }
245
246 let key = format!("{}.{}", prefix, key_suffix);
247 match attr.value.and_then(|v| v.value) {
248 Some(OtlpValue::StringValue(v)) => {
249 row_writer::write_fields(
250 writer,
251 std::iter::once(make_string_column_data(&key, Some(v))),
252 row,
253 )?;
254 }
255 Some(OtlpValue::BoolValue(v)) => {
256 row_writer::write_fields(
257 writer,
258 std::iter::once(make_column_data(
259 &key,
260 ColumnDataType::Boolean,
261 Some(ValueData::BoolValue(v)),
262 )),
263 row,
264 )?;
265 }
266 Some(OtlpValue::IntValue(v)) => {
267 row_writer::write_fields(
268 writer,
269 std::iter::once(make_column_data(
270 &key,
271 ColumnDataType::Int64,
272 Some(ValueData::I64Value(v)),
273 )),
274 row,
275 )?;
276 }
277 Some(OtlpValue::DoubleValue(v)) => {
278 row_writer::write_fields(
279 writer,
280 std::iter::once(make_column_data(
281 &key,
282 ColumnDataType::Float64,
283 Some(ValueData::F64Value(v)),
284 )),
285 row,
286 )?;
287 }
288 Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
289 writer,
290 key,
291 any_value_to_jsonb(OtlpValue::ArrayValue(v)),
292 row,
293 )?,
294 Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
295 writer,
296 key,
297 any_value_to_jsonb(OtlpValue::KvlistValue(v)),
298 row,
299 )?,
300 Some(OtlpValue::BytesValue(v)) => {
301 row_writer::write_fields(
302 writer,
303 std::iter::once(make_column_data(
304 &key,
305 ColumnDataType::Binary,
306 Some(ValueData::BinaryValue(v)),
307 )),
308 row,
309 )?;
310 }
311 None => {}
312 }
313 }
314
315 Ok(())
316}