1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{TraceSpan, parse};
29use crate::otlp::trace::{
30 DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
31 SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
32 SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
33 TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
34};
35use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
36use crate::query_handler::PipelineHandlerRef;
37use crate::row_writer::{self, MultiTableData, TableData};
38
39const APPROXIMATE_COLUMN_COUNT: usize = 30;
40
41const MAX_TIMESTAMP: i64 = 4102444800000000000;
43
44pub fn v1_to_grpc_insert_requests(
56 request: ExportTraceServiceRequest,
57 _pipeline: PipelineWay,
58 _pipeline_params: GreptimePipelineParams,
59 table_name: String,
60 _query_ctx: &QueryContextRef,
61 _pipeline_handler: PipelineHandlerRef,
62) -> Result<(RowInsertRequests, usize)> {
63 let spans = parse(request);
64 let mut multi_table_writer = MultiTableData::default();
65 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
66 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
67 let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
68
69 let mut services = HashSet::new();
70 let mut operations = HashSet::new();
71 for span in spans {
72 if let Some(service_name) = &span.service_name {
73 if !services.contains(service_name) {
75 services.insert(service_name.clone());
76 }
77
78 let operation = (
80 service_name.clone(),
81 span.span_name.clone(),
82 span.span_kind.clone(),
83 );
84 if !operations.contains(&operation) {
85 operations.insert(operation);
86 }
87 }
88 write_span_to_row(&mut trace_writer, span)?;
89 }
90 write_trace_services_to_row(&mut trace_services_writer, services)?;
91 write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
92
93 multi_table_writer.add_table_data(
94 trace_services_table_name(&table_name),
95 trace_services_writer,
96 );
97 multi_table_writer.add_table_data(
98 trace_operations_table_name(&table_name),
99 trace_operations_writer,
100 );
101 multi_table_writer.add_table_data(table_name, trace_writer);
102
103 Ok(multi_table_writer.into_row_insert_requests())
104}
105
106pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
107 let mut row = writer.alloc_one_row();
108
109 row_writer::write_ts_to_nanos(
111 writer,
112 TIMESTAMP_COLUMN,
113 Some(span.start_in_nanosecond as i64),
114 Precision::Nanosecond,
115 &mut row,
116 )?;
117
118 let fields = vec![
120 make_column_data(
121 "timestamp_end",
122 ColumnDataType::TimestampNanosecond,
123 Some(ValueData::TimestampNanosecondValue(
124 span.end_in_nanosecond as i64,
125 )),
126 ),
127 make_column_data(
128 DURATION_NANO_COLUMN,
129 ColumnDataType::Uint64,
130 Some(ValueData::U64Value(
131 span.end_in_nanosecond - span.start_in_nanosecond,
132 )),
133 ),
134 make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
135 make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
136 make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
137 make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
138 make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
139 make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
140 make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
141 make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
142 make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)),
143 make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)),
144 ];
145 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
146
147 if let Some(service_name) = span.service_name {
148 row_writer::write_tags(
149 writer,
150 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
151 &mut row,
152 )?;
153 }
154
155 write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
156 write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
157 write_attributes(
158 writer,
159 "resource_attributes",
160 span.resource_attributes,
161 &mut row,
162 )?;
163
164 row_writer::write_json(
165 writer,
166 SPAN_EVENTS_COLUMN,
167 span.span_events.into(),
168 &mut row,
169 )?;
170 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
171
172 writer.add_row(row);
173
174 Ok(())
175}
176
177fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
178 for service_name in services {
179 let mut row = writer.alloc_one_row();
180 row_writer::write_ts_to_nanos(
182 writer,
183 TIMESTAMP_COLUMN,
184 Some(MAX_TIMESTAMP),
185 Precision::Nanosecond,
186 &mut row,
187 )?;
188
189 row_writer::write_tags(
191 writer,
192 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
193 &mut row,
194 )?;
195 writer.add_row(row);
196 }
197
198 Ok(())
199}
200
201fn write_trace_operations_to_row(
202 writer: &mut TableData,
203 operations: HashSet<(String, String, String)>,
204) -> Result<()> {
205 for (service_name, span_name, span_kind) in operations {
206 let mut row = writer.alloc_one_row();
207 row_writer::write_ts_to_nanos(
209 writer,
210 TIMESTAMP_COLUMN,
211 Some(MAX_TIMESTAMP),
212 Precision::Nanosecond,
213 &mut row,
214 )?;
215
216 row_writer::write_tags(
218 writer,
219 vec![
220 (SERVICE_NAME_COLUMN.to_string(), service_name),
221 (SPAN_NAME_COLUMN.to_string(), span_name),
222 (SPAN_KIND_COLUMN.to_string(), span_kind),
223 ]
224 .into_iter(),
225 &mut row,
226 )?;
227 writer.add_row(row);
228 }
229
230 Ok(())
231}
232
233fn write_attributes(
234 writer: &mut TableData,
235 prefix: &str,
236 attributes: Attributes,
237 row: &mut Vec<Value>,
238) -> Result<()> {
239 for attr in attributes.take().into_iter() {
240 let key_suffix = attr.key;
241 if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
244 continue;
245 }
246
247 let key = format!("{}.{}", prefix, key_suffix);
248 match attr.value.and_then(|v| v.value) {
249 Some(OtlpValue::StringValue(v)) => {
250 row_writer::write_fields(
251 writer,
252 std::iter::once(make_string_column_data(&key, Some(v))),
253 row,
254 )?;
255 }
256 Some(OtlpValue::BoolValue(v)) => {
257 row_writer::write_fields(
258 writer,
259 std::iter::once(make_column_data(
260 &key,
261 ColumnDataType::Boolean,
262 Some(ValueData::BoolValue(v)),
263 )),
264 row,
265 )?;
266 }
267 Some(OtlpValue::IntValue(v)) => {
268 row_writer::write_fields(
269 writer,
270 std::iter::once(make_column_data(
271 &key,
272 ColumnDataType::Int64,
273 Some(ValueData::I64Value(v)),
274 )),
275 row,
276 )?;
277 }
278 Some(OtlpValue::DoubleValue(v)) => {
279 row_writer::write_fields(
280 writer,
281 std::iter::once(make_column_data(
282 &key,
283 ColumnDataType::Float64,
284 Some(ValueData::F64Value(v)),
285 )),
286 row,
287 )?;
288 }
289 Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
290 writer,
291 key,
292 any_value_to_jsonb(OtlpValue::ArrayValue(v)),
293 row,
294 )?,
295 Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
296 writer,
297 key,
298 any_value_to_jsonb(OtlpValue::KvlistValue(v)),
299 row,
300 )?,
301 Some(OtlpValue::BytesValue(v)) => {
302 row_writer::write_fields(
303 writer,
304 std::iter::once(make_column_data(
305 &key,
306 ColumnDataType::Binary,
307 Some(ValueData::BinaryValue(v)),
308 )),
309 row,
310 )?;
311 }
312 None => {}
313 }
314 }
315
316 Ok(())
317}