1use api::v1::{ColumnDataType, Row};
16use servers::error::{self, Result as ServerResult};
17use servers::otlp::trace::coerce::{
18 coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
19 trace_value_datatype,
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub(super) enum TraceReconcileDecision {
24 UseExisting(ColumnDataType),
25 UseRequestLocal(ColumnDataType),
26 AlterExistingTo(ColumnDataType),
27}
28
29impl TraceReconcileDecision {
30 pub(super) fn target_type(self) -> ColumnDataType {
31 match self {
32 Self::UseExisting(target_type)
33 | Self::UseRequestLocal(target_type)
34 | Self::AlterExistingTo(target_type) => target_type,
35 }
36 }
37
38 pub(super) fn requires_alter(self) -> bool {
39 matches!(self, Self::AlterExistingTo(_))
40 }
41}
42
43pub(super) struct PendingTraceColumnRewrite {
44 pub(super) col_idx: usize,
45 pub(super) target_type: ColumnDataType,
46 pub(super) column_name: String,
47}
48
49pub(super) fn choose_trace_reconcile_decision(
54 observed_types: &[ColumnDataType],
55 existing_type: Option<ColumnDataType>,
56) -> ServerResult<Option<TraceReconcileDecision>> {
57 let Some(existing_type) = existing_type else {
58 return resolve_new_trace_column_type(observed_types.iter().copied())
59 .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
60 .map_err(|_| {
61 error::InvalidParameterSnafu {
62 reason: "unsupported trace type mix".to_string(),
63 }
64 .build()
65 });
66 };
67
68 if observed_types.iter().all(|&request_type| {
69 request_type == existing_type || is_supported_trace_coercion(request_type, existing_type)
70 }) {
71 return Ok(Some(TraceReconcileDecision::UseExisting(existing_type)));
72 }
73
74 if existing_type == ColumnDataType::Int64
75 && observed_types.contains(&ColumnDataType::Float64)
76 && observed_types.iter().all(|observed_type| {
77 matches!(
78 observed_type,
79 ColumnDataType::Int64 | ColumnDataType::Float64
80 )
81 })
82 {
83 return Ok(Some(TraceReconcileDecision::AlterExistingTo(
84 ColumnDataType::Float64,
85 )));
86 }
87
88 error::InvalidParameterSnafu {
89 reason: "unsupported trace type mix".to_string(),
90 }
91 .fail()
92}
93
94pub(super) fn validate_trace_column_rewrites(
96 rows: &[Row],
97 pending_rewrites: &[PendingTraceColumnRewrite],
98 table_name: &str,
99) -> ServerResult<()> {
100 for row in rows {
101 for pending_rewrite in pending_rewrites {
102 let Some(value) = row.values.get(pending_rewrite.col_idx) else {
103 continue;
104 };
105 let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype)
106 else {
107 continue;
108 };
109 if request_type == pending_rewrite.target_type {
110 continue;
111 }
112
113 coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type)
114 .map_err(|_| {
115 error::InvalidParameterSnafu {
116 reason: format!(
117 "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
118 pending_rewrite.column_name,
119 table_name,
120 request_type,
121 pending_rewrite.target_type
122 ),
123 }
124 .build()
125 })?;
126 }
127 }
128
129 Ok(())
130}
131
132pub(super) fn enrich_trace_reconcile_error(
133 table_name: &str,
134 column_name: &str,
135 observed_types: &[ColumnDataType],
136 existing_type: Option<ColumnDataType>,
137) -> servers::error::Error {
138 let observed_types = observed_types
139 .iter()
140 .map(|datatype| format!("{datatype:?}"))
141 .collect::<Vec<_>>()
142 .join(", ");
143
144 error::InvalidParameterSnafu {
145 reason: match existing_type {
146 Some(existing_type) => format!(
147 "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
148 column_name, table_name, observed_types, existing_type
149 ),
150 None => format!(
151 "failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
152 column_name, table_name, observed_types
153 ),
154 },
155 }
156 .build()
157}
158
159pub(super) fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
162 matches!(
163 datatype,
164 ColumnDataType::String
165 | ColumnDataType::Boolean
166 | ColumnDataType::Int64
167 | ColumnDataType::Float64
168 )
169}
170
171pub(super) fn push_observed_trace_type(
173 observed_types: &mut Vec<ColumnDataType>,
174 datatype: ColumnDataType,
175) {
176 if !observed_types.contains(&datatype) {
177 observed_types.push(datatype);
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use api::v1::value::ValueData;
184 use api::v1::{ColumnDataType, Row, Value};
185 use common_error::ext::ErrorExt;
186 use common_error::status_code::StatusCode;
187
188 use super::{
189 PendingTraceColumnRewrite, TraceReconcileDecision, choose_trace_reconcile_decision,
190 enrich_trace_reconcile_error, is_trace_reconcile_candidate_type, push_observed_trace_type,
191 validate_trace_column_rewrites,
192 };
193
194 #[test]
195 fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
196 assert_eq!(
197 choose_trace_reconcile_decision(&[ColumnDataType::Int64], Some(ColumnDataType::Int64))
198 .unwrap(),
199 Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
200 );
201 }
202
203 #[test]
204 fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
205 assert_eq!(
206 choose_trace_reconcile_decision(
207 &[ColumnDataType::Int64, ColumnDataType::Float64],
208 Some(ColumnDataType::Int64)
209 )
210 .unwrap(),
211 Some(TraceReconcileDecision::AlterExistingTo(
212 ColumnDataType::Float64
213 ))
214 );
215 }
216
217 #[test]
218 fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
219 assert_eq!(
220 choose_trace_reconcile_decision(
221 &[ColumnDataType::Int64, ColumnDataType::Float64],
222 Some(ColumnDataType::Float64)
223 )
224 .unwrap(),
225 Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64))
226 );
227 }
228
229 #[test]
230 fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
231 let err = choose_trace_reconcile_decision(
232 &[ColumnDataType::Boolean, ColumnDataType::Int64],
233 Some(ColumnDataType::Int64),
234 )
235 .unwrap_err();
236 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
237 }
238
239 #[test]
240 fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
241 assert_eq!(
242 choose_trace_reconcile_decision(
243 &[ColumnDataType::Int64, ColumnDataType::Float64],
244 None
245 )
246 .unwrap(),
247 Some(TraceReconcileDecision::UseRequestLocal(
248 ColumnDataType::Float64
249 ))
250 );
251 }
252
253 #[test]
254 fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() {
255 let rows = vec![Row {
256 values: vec![Value {
257 value_data: Some(ValueData::StringValue("not_a_number".to_string())),
258 }],
259 }];
260 let pending_rewrites = vec![PendingTraceColumnRewrite {
261 col_idx: 0,
262 target_type: ColumnDataType::Int64,
263 column_name: "span_attributes.attr_int".to_string(),
264 }];
265
266 let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
267 .unwrap_err();
268 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
269 }
270
271 #[test]
272 fn test_enrich_trace_reconcile_error_includes_existing_type() {
273 let err = enrich_trace_reconcile_error(
274 "trace_type_atomicity",
275 "span_attributes.attr_int",
276 &[ColumnDataType::String, ColumnDataType::Int64],
277 Some(ColumnDataType::Boolean),
278 );
279
280 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
281 assert!(err.to_string().contains("span_attributes.attr_int"));
282 assert!(err.to_string().contains("Boolean"));
283 }
284
285 #[test]
286 fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() {
287 assert!(is_trace_reconcile_candidate_type(ColumnDataType::String));
288 assert!(is_trace_reconcile_candidate_type(ColumnDataType::Boolean));
289 assert!(!is_trace_reconcile_candidate_type(ColumnDataType::Binary));
290 assert!(!is_trace_reconcile_candidate_type(
291 ColumnDataType::TimestampMillisecond
292 ));
293 }
294
295 #[test]
296 fn test_push_observed_trace_type_deduplicates_types() {
297 let mut observed_types = Vec::new();
298
299 push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
300 push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
301 push_observed_trace_type(&mut observed_types, ColumnDataType::Float64);
302
303 assert_eq!(
304 observed_types,
305 vec![ColumnDataType::Int64, ColumnDataType::Float64]
306 );
307 }
308}