Skip to main content

frontend/instance/otlp/
trace_types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::{ColumnDataType, Row};
16use servers::error::{self, Result as ServerResult};
17use servers::otlp::trace::coerce::{
18    coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
19    trace_value_datatype,
20};
21
22use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(super) enum TraceReconcileDecision {
26    UseExisting(ColumnDataType),
27    UseRequestLocal(ColumnDataType),
28    AlterExistingTo(ColumnDataType),
29}
30
31impl TraceReconcileDecision {
32    pub(super) fn target_type(self) -> ColumnDataType {
33        match self {
34            Self::UseExisting(target_type)
35            | Self::UseRequestLocal(target_type)
36            | Self::AlterExistingTo(target_type) => target_type,
37        }
38    }
39
40    pub(super) fn requires_alter(self) -> bool {
41        matches!(self, Self::AlterExistingTo(_))
42    }
43}
44
45pub(super) struct PendingTraceColumnRewrite {
46    pub(super) col_idx: usize,
47    pub(super) target_type: ColumnDataType,
48    pub(super) column_name: String,
49}
50
51/// Picks the reconciliation action for one trace column.
52///
53/// Existing table schema is authoritative unless the only incompatible case is
54/// widening an existing Int64 column to Float64 for incoming Int64/Float64 data.
55pub(super) fn choose_trace_reconcile_decision(
56    column_name: &str,
57    observed_types: &[ColumnDataType],
58    existing_type: Option<ColumnDataType>,
59) -> ServerResult<Option<TraceReconcileDecision>> {
60    if let Some(fixed_type) = trace_semconv_fixed_type(column_name) {
61        return choose_fixed_trace_reconcile_decision(fixed_type, observed_types, existing_type);
62    }
63
64    let Some(existing_type) = existing_type else {
65        return resolve_new_trace_column_type(observed_types.iter().copied())
66            .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
67            .map_err(|_| {
68                error::InvalidParameterSnafu {
69                    reason: "unsupported trace type mix".to_string(),
70                }
71                .build()
72            });
73    };
74
75    if observed_types.iter().all(|&request_type| {
76        request_type == existing_type || is_supported_trace_coercion(request_type, existing_type)
77    }) {
78        return Ok(Some(TraceReconcileDecision::UseExisting(existing_type)));
79    }
80
81    if existing_type == ColumnDataType::Int64
82        && observed_types.contains(&ColumnDataType::Float64)
83        && observed_types.iter().all(|observed_type| {
84            matches!(
85                observed_type,
86                ColumnDataType::Int64 | ColumnDataType::Float64
87            )
88        })
89    {
90        return Ok(Some(TraceReconcileDecision::AlterExistingTo(
91            ColumnDataType::Float64,
92        )));
93    }
94
95    error::InvalidParameterSnafu {
96        reason: "unsupported trace type mix".to_string(),
97    }
98    .fail()
99}
100
101fn choose_fixed_trace_reconcile_decision(
102    fixed_type: ColumnDataType,
103    observed_types: &[ColumnDataType],
104    existing_type: Option<ColumnDataType>,
105) -> ServerResult<Option<TraceReconcileDecision>> {
106    let Some(existing_type) = existing_type else {
107        return Ok(Some(TraceReconcileDecision::UseRequestLocal(fixed_type)));
108    };
109
110    if existing_type == fixed_type {
111        return Ok(Some(TraceReconcileDecision::UseExisting(fixed_type)));
112    }
113
114    if fixed_type == ColumnDataType::Float64
115        && existing_type == ColumnDataType::Int64
116        && observed_types.iter().all(|observed_type| {
117            matches!(
118                observed_type,
119                ColumnDataType::Int64 | ColumnDataType::Float64
120            )
121        })
122    {
123        return Ok(Some(TraceReconcileDecision::AlterExistingTo(fixed_type)));
124    }
125
126    error::InvalidParameterSnafu {
127        reason: "unsupported trace type mix".to_string(),
128    }
129    .fail()
130}
131
132/// Validate all pending trace column rewrites before any schema mutation happens.
133pub(super) fn validate_trace_column_rewrites(
134    rows: &[Row],
135    pending_rewrites: &[PendingTraceColumnRewrite],
136    table_name: &str,
137) -> ServerResult<()> {
138    for row in rows {
139        for pending_rewrite in pending_rewrites {
140            let Some(value) = row.values.get(pending_rewrite.col_idx) else {
141                continue;
142            };
143            let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype)
144            else {
145                continue;
146            };
147            if request_type == pending_rewrite.target_type {
148                continue;
149            }
150
151            coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type)
152                .map_err(|_| {
153                    error::InvalidParameterSnafu {
154                        reason: format!(
155                            "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
156                            pending_rewrite.column_name,
157                            table_name,
158                            request_type,
159                            pending_rewrite.target_type
160                        ),
161                    }
162                    .build()
163                })?;
164        }
165    }
166
167    Ok(())
168}
169
170pub(super) fn enrich_trace_reconcile_error(
171    table_name: &str,
172    column_name: &str,
173    observed_types: &[ColumnDataType],
174    existing_type: Option<ColumnDataType>,
175    fixed_type: Option<ColumnDataType>,
176) -> servers::error::Error {
177    let observed_types = observed_types
178        .iter()
179        .map(|datatype| format!("{datatype:?}"))
180        .collect::<Vec<_>>()
181        .join(", ");
182
183    error::InvalidParameterSnafu {
184        reason: match (existing_type, fixed_type) {
185            (Some(existing_type), Some(fixed_type)) => format!(
186                "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?} and fixed semconv {:?}",
187                column_name, table_name, observed_types, existing_type, fixed_type
188            ),
189            (Some(existing_type), None) => format!(
190                "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
191                column_name, table_name, observed_types, existing_type
192            ),
193            (None, Some(fixed_type)) => format!(
194                "failed to reconcile trace column '{}' in table '{}' with observed types [{}] and fixed semconv {:?}",
195                column_name, table_name, observed_types, fixed_type
196            ),
197            (None, None) => format!(
198                "failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
199                column_name, table_name, observed_types
200            ),
201        },
202    }
203    .build()
204}
205
206/// Only these trace scalar types participate in reconciliation. Other column kinds
207/// such as JSON and binary keep their original write path and schema checks.
208pub(super) fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
209    matches!(
210        datatype,
211        ColumnDataType::String
212            | ColumnDataType::Boolean
213            | ColumnDataType::Int64
214            | ColumnDataType::Float64
215    )
216}
217
218/// Keeps the observed type list small without depending on enum ordering.
219pub(super) fn push_observed_trace_type(
220    observed_types: &mut Vec<ColumnDataType>,
221    datatype: ColumnDataType,
222) {
223    if !observed_types.contains(&datatype) {
224        observed_types.push(datatype);
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use api::v1::value::ValueData;
231    use api::v1::{ColumnDataType, Row, Value};
232    use common_error::ext::ErrorExt;
233    use common_error::status_code::StatusCode;
234
235    use super::{
236        PendingTraceColumnRewrite, TraceReconcileDecision, choose_trace_reconcile_decision,
237        enrich_trace_reconcile_error, is_trace_reconcile_candidate_type, push_observed_trace_type,
238        validate_trace_column_rewrites,
239    };
240
241    #[test]
242    fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
243        assert_eq!(
244            choose_trace_reconcile_decision(
245                "span_attributes.attr_int",
246                &[ColumnDataType::Int64],
247                Some(ColumnDataType::Int64)
248            )
249            .unwrap(),
250            Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
251        );
252    }
253
254    #[test]
255    fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
256        assert_eq!(
257            choose_trace_reconcile_decision(
258                "span_attributes.attr_double",
259                &[ColumnDataType::Int64, ColumnDataType::Float64],
260                Some(ColumnDataType::Int64)
261            )
262            .unwrap(),
263            Some(TraceReconcileDecision::AlterExistingTo(
264                ColumnDataType::Float64
265            ))
266        );
267    }
268
269    #[test]
270    fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
271        assert_eq!(
272            choose_trace_reconcile_decision(
273                "span_attributes.attr_double",
274                &[ColumnDataType::Int64, ColumnDataType::Float64],
275                Some(ColumnDataType::Float64)
276            )
277            .unwrap(),
278            Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64))
279        );
280    }
281
282    #[test]
283    fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
284        let err = choose_trace_reconcile_decision(
285            "span_attributes.attr_numeric",
286            &[ColumnDataType::Boolean, ColumnDataType::Int64],
287            Some(ColumnDataType::Int64),
288        )
289        .unwrap_err();
290        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
291    }
292
293    #[test]
294    fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
295        assert_eq!(
296            choose_trace_reconcile_decision(
297                "span_attributes.attr_numeric",
298                &[ColumnDataType::Int64, ColumnDataType::Float64],
299                None
300            )
301            .unwrap(),
302            Some(TraceReconcileDecision::UseRequestLocal(
303                ColumnDataType::Float64
304            ))
305        );
306    }
307
308    #[test]
309    fn test_choose_trace_reconcile_decision_whitelisted_new_int64_column_uses_fixed_type() {
310        assert_eq!(
311            choose_trace_reconcile_decision(
312                "span_attributes.http.response.status_code",
313                &[ColumnDataType::String, ColumnDataType::Int64],
314                None
315            )
316            .unwrap(),
317            Some(TraceReconcileDecision::UseRequestLocal(
318                ColumnDataType::Int64
319            ))
320        );
321    }
322
323    #[test]
324    fn test_choose_trace_reconcile_decision_new_boolean_column_uses_dynamic_resolution() {
325        assert_eq!(
326            choose_trace_reconcile_decision(
327                "span_attributes.messaging.destination.temporary",
328                &[ColumnDataType::String, ColumnDataType::Boolean],
329                None
330            )
331            .unwrap(),
332            Some(TraceReconcileDecision::UseRequestLocal(
333                ColumnDataType::Boolean
334            ))
335        );
336    }
337
338    #[test]
339    fn test_choose_trace_reconcile_decision_whitelisted_existing_matching_type_uses_fixed_type() {
340        assert_eq!(
341            choose_trace_reconcile_decision(
342                "resource_attributes.service.name",
343                &[ColumnDataType::String],
344                Some(ColumnDataType::String)
345            )
346            .unwrap(),
347            Some(TraceReconcileDecision::UseExisting(ColumnDataType::String))
348        );
349    }
350
351    #[test]
352    fn test_choose_trace_reconcile_decision_whitelisted_existing_conflicting_type_is_error() {
353        let err = choose_trace_reconcile_decision(
354            "span_attributes.server.port",
355            &[ColumnDataType::Int64],
356            Some(ColumnDataType::String),
357        )
358        .unwrap_err();
359        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
360    }
361
362    #[test]
363    fn test_choose_trace_reconcile_decision_non_whitelisted_retains_dynamic_behavior() {
364        assert_eq!(
365            choose_trace_reconcile_decision(
366                "span_attributes.attr_numeric",
367                &[ColumnDataType::Int64, ColumnDataType::Float64],
368                None
369            )
370            .unwrap(),
371            Some(TraceReconcileDecision::UseRequestLocal(
372                ColumnDataType::Float64
373            ))
374        );
375    }
376
377    #[test]
378    fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() {
379        let rows = vec![Row {
380            values: vec![Value {
381                value_data: Some(ValueData::StringValue("not_a_number".to_string())),
382            }],
383        }];
384        let pending_rewrites = vec![PendingTraceColumnRewrite {
385            col_idx: 0,
386            target_type: ColumnDataType::Int64,
387            column_name: "span_attributes.attr_int".to_string(),
388        }];
389
390        let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
391            .unwrap_err();
392        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
393    }
394
395    #[test]
396    fn test_validate_trace_column_rewrites_whitelisted_values_validate_against_fixed_type() {
397        let rows = vec![Row {
398            values: vec![Value {
399                value_data: Some(ValueData::StringValue("503".to_string())),
400            }],
401        }];
402        let pending_rewrites = vec![PendingTraceColumnRewrite {
403            col_idx: 0,
404            target_type: ColumnDataType::Int64,
405            column_name: "span_attributes.http.response.status_code".to_string(),
406        }];
407
408        validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity").unwrap();
409    }
410
411    #[test]
412    fn test_validate_trace_column_rewrites_whitelisted_boolean_rejects_invalid_string_parse() {
413        let rows = vec![Row {
414            values: vec![Value {
415                value_data: Some(ValueData::StringValue("not_a_bool".to_string())),
416            }],
417        }];
418        let pending_rewrites = vec![PendingTraceColumnRewrite {
419            col_idx: 0,
420            target_type: ColumnDataType::Boolean,
421            column_name: "span_attributes.messaging.destination.temporary".to_string(),
422        }];
423
424        let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
425            .unwrap_err();
426        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
427    }
428
429    #[test]
430    fn test_enrich_trace_reconcile_error_includes_existing_type() {
431        let err = enrich_trace_reconcile_error(
432            "trace_type_atomicity",
433            "span_attributes.attr_int",
434            &[ColumnDataType::String, ColumnDataType::Int64],
435            Some(ColumnDataType::Boolean),
436            None,
437        );
438
439        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
440        assert!(err.to_string().contains("span_attributes.attr_int"));
441        assert!(err.to_string().contains("Boolean"));
442    }
443
444    #[test]
445    fn test_enrich_trace_reconcile_error_includes_fixed_semconv_type() {
446        let err = enrich_trace_reconcile_error(
447            "trace_type_atomicity",
448            "span_attributes.server.port",
449            &[ColumnDataType::String, ColumnDataType::Int64],
450            Some(ColumnDataType::String),
451            Some(ColumnDataType::Int64),
452        );
453
454        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
455        assert!(err.to_string().contains("span_attributes.server.port"));
456        assert!(err.to_string().contains("fixed semconv Int64"));
457    }
458
459    #[test]
460    fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() {
461        assert!(is_trace_reconcile_candidate_type(ColumnDataType::String));
462        assert!(is_trace_reconcile_candidate_type(ColumnDataType::Boolean));
463        assert!(!is_trace_reconcile_candidate_type(ColumnDataType::Binary));
464        assert!(!is_trace_reconcile_candidate_type(
465            ColumnDataType::TimestampMillisecond
466        ));
467    }
468
469    #[test]
470    fn test_push_observed_trace_type_deduplicates_types() {
471        let mut observed_types = Vec::new();
472
473        push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
474        push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
475        push_observed_trace_type(&mut observed_types, ColumnDataType::Float64);
476
477        assert_eq!(
478            observed_types,
479            vec![ColumnDataType::Int64, ColumnDataType::Float64]
480        );
481    }
482}