1use api::v1::{ColumnDataType, Row};
16use servers::error::{self, Result as ServerResult};
17use servers::otlp::trace::coerce::{
18 coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
19 trace_value_datatype,
20};
21
22use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(super) enum TraceReconcileDecision {
26 UseExisting(ColumnDataType),
27 UseRequestLocal(ColumnDataType),
28 AlterExistingTo(ColumnDataType),
29}
30
31impl TraceReconcileDecision {
32 pub(super) fn target_type(self) -> ColumnDataType {
33 match self {
34 Self::UseExisting(target_type)
35 | Self::UseRequestLocal(target_type)
36 | Self::AlterExistingTo(target_type) => target_type,
37 }
38 }
39
40 pub(super) fn requires_alter(self) -> bool {
41 matches!(self, Self::AlterExistingTo(_))
42 }
43}
44
45pub(super) struct PendingTraceColumnRewrite {
46 pub(super) col_idx: usize,
47 pub(super) target_type: ColumnDataType,
48 pub(super) column_name: String,
49}
50
51pub(super) fn choose_trace_reconcile_decision(
56 column_name: &str,
57 observed_types: &[ColumnDataType],
58 existing_type: Option<ColumnDataType>,
59) -> ServerResult<Option<TraceReconcileDecision>> {
60 if let Some(fixed_type) = trace_semconv_fixed_type(column_name) {
61 return choose_fixed_trace_reconcile_decision(fixed_type, observed_types, existing_type);
62 }
63
64 let Some(existing_type) = existing_type else {
65 return resolve_new_trace_column_type(observed_types.iter().copied())
66 .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
67 .map_err(|_| {
68 error::InvalidParameterSnafu {
69 reason: "unsupported trace type mix".to_string(),
70 }
71 .build()
72 });
73 };
74
75 if observed_types.iter().all(|&request_type| {
76 request_type == existing_type || is_supported_trace_coercion(request_type, existing_type)
77 }) {
78 return Ok(Some(TraceReconcileDecision::UseExisting(existing_type)));
79 }
80
81 if existing_type == ColumnDataType::Int64
82 && observed_types.contains(&ColumnDataType::Float64)
83 && observed_types.iter().all(|observed_type| {
84 matches!(
85 observed_type,
86 ColumnDataType::Int64 | ColumnDataType::Float64
87 )
88 })
89 {
90 return Ok(Some(TraceReconcileDecision::AlterExistingTo(
91 ColumnDataType::Float64,
92 )));
93 }
94
95 error::InvalidParameterSnafu {
96 reason: "unsupported trace type mix".to_string(),
97 }
98 .fail()
99}
100
101fn choose_fixed_trace_reconcile_decision(
102 fixed_type: ColumnDataType,
103 observed_types: &[ColumnDataType],
104 existing_type: Option<ColumnDataType>,
105) -> ServerResult<Option<TraceReconcileDecision>> {
106 let Some(existing_type) = existing_type else {
107 return Ok(Some(TraceReconcileDecision::UseRequestLocal(fixed_type)));
108 };
109
110 if existing_type == fixed_type {
111 return Ok(Some(TraceReconcileDecision::UseExisting(fixed_type)));
112 }
113
114 if fixed_type == ColumnDataType::Float64
115 && existing_type == ColumnDataType::Int64
116 && observed_types.iter().all(|observed_type| {
117 matches!(
118 observed_type,
119 ColumnDataType::Int64 | ColumnDataType::Float64
120 )
121 })
122 {
123 return Ok(Some(TraceReconcileDecision::AlterExistingTo(fixed_type)));
124 }
125
126 error::InvalidParameterSnafu {
127 reason: "unsupported trace type mix".to_string(),
128 }
129 .fail()
130}
131
132pub(super) fn validate_trace_column_rewrites(
134 rows: &[Row],
135 pending_rewrites: &[PendingTraceColumnRewrite],
136 table_name: &str,
137) -> ServerResult<()> {
138 for row in rows {
139 for pending_rewrite in pending_rewrites {
140 let Some(value) = row.values.get(pending_rewrite.col_idx) else {
141 continue;
142 };
143 let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype)
144 else {
145 continue;
146 };
147 if request_type == pending_rewrite.target_type {
148 continue;
149 }
150
151 coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type)
152 .map_err(|_| {
153 error::InvalidParameterSnafu {
154 reason: format!(
155 "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
156 pending_rewrite.column_name,
157 table_name,
158 request_type,
159 pending_rewrite.target_type
160 ),
161 }
162 .build()
163 })?;
164 }
165 }
166
167 Ok(())
168}
169
170pub(super) fn enrich_trace_reconcile_error(
171 table_name: &str,
172 column_name: &str,
173 observed_types: &[ColumnDataType],
174 existing_type: Option<ColumnDataType>,
175 fixed_type: Option<ColumnDataType>,
176) -> servers::error::Error {
177 let observed_types = observed_types
178 .iter()
179 .map(|datatype| format!("{datatype:?}"))
180 .collect::<Vec<_>>()
181 .join(", ");
182
183 error::InvalidParameterSnafu {
184 reason: match (existing_type, fixed_type) {
185 (Some(existing_type), Some(fixed_type)) => format!(
186 "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?} and fixed semconv {:?}",
187 column_name, table_name, observed_types, existing_type, fixed_type
188 ),
189 (Some(existing_type), None) => format!(
190 "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
191 column_name, table_name, observed_types, existing_type
192 ),
193 (None, Some(fixed_type)) => format!(
194 "failed to reconcile trace column '{}' in table '{}' with observed types [{}] and fixed semconv {:?}",
195 column_name, table_name, observed_types, fixed_type
196 ),
197 (None, None) => format!(
198 "failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
199 column_name, table_name, observed_types
200 ),
201 },
202 }
203 .build()
204}
205
206pub(super) fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
209 matches!(
210 datatype,
211 ColumnDataType::String
212 | ColumnDataType::Boolean
213 | ColumnDataType::Int64
214 | ColumnDataType::Float64
215 )
216}
217
218pub(super) fn push_observed_trace_type(
220 observed_types: &mut Vec<ColumnDataType>,
221 datatype: ColumnDataType,
222) {
223 if !observed_types.contains(&datatype) {
224 observed_types.push(datatype);
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use api::v1::value::ValueData;
231 use api::v1::{ColumnDataType, Row, Value};
232 use common_error::ext::ErrorExt;
233 use common_error::status_code::StatusCode;
234
235 use super::{
236 PendingTraceColumnRewrite, TraceReconcileDecision, choose_trace_reconcile_decision,
237 enrich_trace_reconcile_error, is_trace_reconcile_candidate_type, push_observed_trace_type,
238 validate_trace_column_rewrites,
239 };
240
241 #[test]
242 fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
243 assert_eq!(
244 choose_trace_reconcile_decision(
245 "span_attributes.attr_int",
246 &[ColumnDataType::Int64],
247 Some(ColumnDataType::Int64)
248 )
249 .unwrap(),
250 Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
251 );
252 }
253
254 #[test]
255 fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
256 assert_eq!(
257 choose_trace_reconcile_decision(
258 "span_attributes.attr_double",
259 &[ColumnDataType::Int64, ColumnDataType::Float64],
260 Some(ColumnDataType::Int64)
261 )
262 .unwrap(),
263 Some(TraceReconcileDecision::AlterExistingTo(
264 ColumnDataType::Float64
265 ))
266 );
267 }
268
269 #[test]
270 fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
271 assert_eq!(
272 choose_trace_reconcile_decision(
273 "span_attributes.attr_double",
274 &[ColumnDataType::Int64, ColumnDataType::Float64],
275 Some(ColumnDataType::Float64)
276 )
277 .unwrap(),
278 Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64))
279 );
280 }
281
282 #[test]
283 fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
284 let err = choose_trace_reconcile_decision(
285 "span_attributes.attr_numeric",
286 &[ColumnDataType::Boolean, ColumnDataType::Int64],
287 Some(ColumnDataType::Int64),
288 )
289 .unwrap_err();
290 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
291 }
292
293 #[test]
294 fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
295 assert_eq!(
296 choose_trace_reconcile_decision(
297 "span_attributes.attr_numeric",
298 &[ColumnDataType::Int64, ColumnDataType::Float64],
299 None
300 )
301 .unwrap(),
302 Some(TraceReconcileDecision::UseRequestLocal(
303 ColumnDataType::Float64
304 ))
305 );
306 }
307
308 #[test]
309 fn test_choose_trace_reconcile_decision_whitelisted_new_int64_column_uses_fixed_type() {
310 assert_eq!(
311 choose_trace_reconcile_decision(
312 "span_attributes.http.response.status_code",
313 &[ColumnDataType::String, ColumnDataType::Int64],
314 None
315 )
316 .unwrap(),
317 Some(TraceReconcileDecision::UseRequestLocal(
318 ColumnDataType::Int64
319 ))
320 );
321 }
322
323 #[test]
324 fn test_choose_trace_reconcile_decision_new_boolean_column_uses_dynamic_resolution() {
325 assert_eq!(
326 choose_trace_reconcile_decision(
327 "span_attributes.messaging.destination.temporary",
328 &[ColumnDataType::String, ColumnDataType::Boolean],
329 None
330 )
331 .unwrap(),
332 Some(TraceReconcileDecision::UseRequestLocal(
333 ColumnDataType::Boolean
334 ))
335 );
336 }
337
338 #[test]
339 fn test_choose_trace_reconcile_decision_whitelisted_existing_matching_type_uses_fixed_type() {
340 assert_eq!(
341 choose_trace_reconcile_decision(
342 "resource_attributes.service.name",
343 &[ColumnDataType::String],
344 Some(ColumnDataType::String)
345 )
346 .unwrap(),
347 Some(TraceReconcileDecision::UseExisting(ColumnDataType::String))
348 );
349 }
350
351 #[test]
352 fn test_choose_trace_reconcile_decision_whitelisted_existing_conflicting_type_is_error() {
353 let err = choose_trace_reconcile_decision(
354 "span_attributes.server.port",
355 &[ColumnDataType::Int64],
356 Some(ColumnDataType::String),
357 )
358 .unwrap_err();
359 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
360 }
361
362 #[test]
363 fn test_choose_trace_reconcile_decision_non_whitelisted_retains_dynamic_behavior() {
364 assert_eq!(
365 choose_trace_reconcile_decision(
366 "span_attributes.attr_numeric",
367 &[ColumnDataType::Int64, ColumnDataType::Float64],
368 None
369 )
370 .unwrap(),
371 Some(TraceReconcileDecision::UseRequestLocal(
372 ColumnDataType::Float64
373 ))
374 );
375 }
376
377 #[test]
378 fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() {
379 let rows = vec![Row {
380 values: vec![Value {
381 value_data: Some(ValueData::StringValue("not_a_number".to_string())),
382 }],
383 }];
384 let pending_rewrites = vec![PendingTraceColumnRewrite {
385 col_idx: 0,
386 target_type: ColumnDataType::Int64,
387 column_name: "span_attributes.attr_int".to_string(),
388 }];
389
390 let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
391 .unwrap_err();
392 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
393 }
394
395 #[test]
396 fn test_validate_trace_column_rewrites_whitelisted_values_validate_against_fixed_type() {
397 let rows = vec![Row {
398 values: vec![Value {
399 value_data: Some(ValueData::StringValue("503".to_string())),
400 }],
401 }];
402 let pending_rewrites = vec![PendingTraceColumnRewrite {
403 col_idx: 0,
404 target_type: ColumnDataType::Int64,
405 column_name: "span_attributes.http.response.status_code".to_string(),
406 }];
407
408 validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity").unwrap();
409 }
410
411 #[test]
412 fn test_validate_trace_column_rewrites_whitelisted_boolean_rejects_invalid_string_parse() {
413 let rows = vec![Row {
414 values: vec![Value {
415 value_data: Some(ValueData::StringValue("not_a_bool".to_string())),
416 }],
417 }];
418 let pending_rewrites = vec![PendingTraceColumnRewrite {
419 col_idx: 0,
420 target_type: ColumnDataType::Boolean,
421 column_name: "span_attributes.messaging.destination.temporary".to_string(),
422 }];
423
424 let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
425 .unwrap_err();
426 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
427 }
428
429 #[test]
430 fn test_enrich_trace_reconcile_error_includes_existing_type() {
431 let err = enrich_trace_reconcile_error(
432 "trace_type_atomicity",
433 "span_attributes.attr_int",
434 &[ColumnDataType::String, ColumnDataType::Int64],
435 Some(ColumnDataType::Boolean),
436 None,
437 );
438
439 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
440 assert!(err.to_string().contains("span_attributes.attr_int"));
441 assert!(err.to_string().contains("Boolean"));
442 }
443
444 #[test]
445 fn test_enrich_trace_reconcile_error_includes_fixed_semconv_type() {
446 let err = enrich_trace_reconcile_error(
447 "trace_type_atomicity",
448 "span_attributes.server.port",
449 &[ColumnDataType::String, ColumnDataType::Int64],
450 Some(ColumnDataType::String),
451 Some(ColumnDataType::Int64),
452 );
453
454 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
455 assert!(err.to_string().contains("span_attributes.server.port"));
456 assert!(err.to_string().contains("fixed semconv Int64"));
457 }
458
459 #[test]
460 fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() {
461 assert!(is_trace_reconcile_candidate_type(ColumnDataType::String));
462 assert!(is_trace_reconcile_candidate_type(ColumnDataType::Boolean));
463 assert!(!is_trace_reconcile_candidate_type(ColumnDataType::Binary));
464 assert!(!is_trace_reconcile_candidate_type(
465 ColumnDataType::TimestampMillisecond
466 ));
467 }
468
469 #[test]
470 fn test_push_observed_trace_type_deduplicates_types() {
471 let mut observed_types = Vec::new();
472
473 push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
474 push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
475 push_observed_trace_type(&mut observed_types, ColumnDataType::Float64);
476
477 assert_eq!(
478 observed_types,
479 vec![ColumnDataType::Int64, ColumnDataType::Float64]
480 );
481 }
482}