1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use session::context::QueryContextRef;
25
26use crate::error::Result;
27use crate::otlp::trace::attributes::Attributes;
28use crate::otlp::trace::span::{parse, TraceSpan};
29use crate::otlp::trace::{
30 DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
31 SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
32 TRACE_ID_COLUMN,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40pub fn v1_to_grpc_insert_requests(
52 request: ExportTraceServiceRequest,
53 _pipeline: PipelineWay,
54 _pipeline_params: GreptimePipelineParams,
55 table_name: String,
56 _query_ctx: &QueryContextRef,
57 _pipeline_handler: PipelineHandlerRef,
58) -> Result<(RowInsertRequests, usize)> {
59 let spans = parse(request);
60 let mut multi_table_writer = MultiTableData::default();
61 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
62 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
63
64 let mut services = HashSet::new();
65 for span in spans {
66 if let Some(service_name) = &span.service_name {
67 if !services.contains(service_name) {
69 services.insert(service_name.clone());
70 }
71 }
72 write_span_to_row(&mut trace_writer, span)?;
73 }
74 write_trace_services_to_row(&mut trace_services_writer, services)?;
75
76 multi_table_writer.add_table_data(
77 trace_services_table_name(&table_name),
78 trace_services_writer,
79 );
80 multi_table_writer.add_table_data(table_name, trace_writer);
81
82 Ok(multi_table_writer.into_row_insert_requests())
83}
84
85pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
86 let mut row = writer.alloc_one_row();
87
88 row_writer::write_ts_to_nanos(
90 writer,
91 TIMESTAMP_COLUMN,
92 Some(span.start_in_nanosecond as i64),
93 Precision::Nanosecond,
94 &mut row,
95 )?;
96 let fields = vec![
98 make_column_data(
99 "timestamp_end",
100 ColumnDataType::TimestampNanosecond,
101 ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
102 ),
103 make_column_data(
104 DURATION_NANO_COLUMN,
105 ColumnDataType::Uint64,
106 ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
107 ),
108 ];
109 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
110
111 if let Some(parent_span_id) = span.parent_span_id {
113 row_writer::write_fields(
114 writer,
115 std::iter::once(make_string_column_data(
116 PARENT_SPAN_ID_COLUMN,
117 parent_span_id,
118 )),
119 &mut row,
120 )?;
121 }
122
123 let fields = vec![
124 make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
125 make_string_column_data(SPAN_ID_COLUMN, span.span_id),
126 make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
127 make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
128 make_string_column_data("span_status_code", span.span_status_code),
129 make_string_column_data("span_status_message", span.span_status_message),
130 make_string_column_data("trace_state", span.trace_state),
131 make_string_column_data("scope_name", span.scope_name),
132 make_string_column_data("scope_version", span.scope_version),
133 ];
134 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
135
136 if let Some(service_name) = span.service_name {
137 row_writer::write_tags(
138 writer,
139 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
140 &mut row,
141 )?;
142 }
143
144 write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
145 write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
146 write_attributes(
147 writer,
148 "resource_attributes",
149 span.resource_attributes,
150 &mut row,
151 )?;
152
153 row_writer::write_json(
154 writer,
155 SPAN_EVENTS_COLUMN,
156 span.span_events.into(),
157 &mut row,
158 )?;
159 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
160
161 writer.add_row(row);
162
163 Ok(())
164}
165
166fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
167 for service_name in services {
168 let mut row = writer.alloc_one_row();
169 row_writer::write_ts_to_nanos(
171 writer,
172 TIMESTAMP_COLUMN,
173 Some(4102444800000000000), Precision::Nanosecond,
175 &mut row,
176 )?;
177
178 row_writer::write_tags(
180 writer,
181 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
182 &mut row,
183 )?;
184 writer.add_row(row);
185 }
186
187 Ok(())
188}
189
190fn write_attributes(
191 writer: &mut TableData,
192 prefix: &str,
193 attributes: Attributes,
194 row: &mut Vec<Value>,
195) -> Result<()> {
196 for attr in attributes.take().into_iter() {
197 let key_suffix = attr.key;
198 if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
201 continue;
202 }
203
204 let key = format!("{}.{}", prefix, key_suffix);
205 match attr.value.and_then(|v| v.value) {
206 Some(OtlpValue::StringValue(v)) => {
207 row_writer::write_fields(
208 writer,
209 std::iter::once(make_string_column_data(&key, v)),
210 row,
211 )?;
212 }
213 Some(OtlpValue::BoolValue(v)) => {
214 row_writer::write_fields(
215 writer,
216 std::iter::once(make_column_data(
217 &key,
218 ColumnDataType::Boolean,
219 ValueData::BoolValue(v),
220 )),
221 row,
222 )?;
223 }
224 Some(OtlpValue::IntValue(v)) => {
225 row_writer::write_fields(
226 writer,
227 std::iter::once(make_column_data(
228 &key,
229 ColumnDataType::Int64,
230 ValueData::I64Value(v),
231 )),
232 row,
233 )?;
234 }
235 Some(OtlpValue::DoubleValue(v)) => {
236 row_writer::write_fields(
237 writer,
238 std::iter::once(make_column_data(
239 &key,
240 ColumnDataType::Float64,
241 ValueData::F64Value(v),
242 )),
243 row,
244 )?;
245 }
246 Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
247 writer,
248 key,
249 any_value_to_jsonb(OtlpValue::ArrayValue(v)),
250 row,
251 )?,
252 Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
253 writer,
254 key,
255 any_value_to_jsonb(OtlpValue::KvlistValue(v)),
256 row,
257 )?,
258 Some(OtlpValue::BytesValue(v)) => {
259 row_writer::write_fields(
260 writer,
261 std::iter::once(make_column_data(
262 &key,
263 ColumnDataType::Binary,
264 ValueData::BinaryValue(v),
265 )),
266 row,
267 )?;
268 }
269 None => {}
270 }
271 }
272
273 Ok(())
274}