Skip to main content

frontend/instance/otlp/
trace_types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::{ColumnDataType, Row};
16use servers::error::{self, Result as ServerResult};
17use servers::otlp::trace::coerce::{
18    coerce_value_data, is_supported_trace_coercion, resolve_new_trace_column_type,
19    trace_value_datatype,
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub(super) enum TraceReconcileDecision {
24    UseExisting(ColumnDataType),
25    UseRequestLocal(ColumnDataType),
26    AlterExistingTo(ColumnDataType),
27}
28
29impl TraceReconcileDecision {
30    pub(super) fn target_type(self) -> ColumnDataType {
31        match self {
32            Self::UseExisting(target_type)
33            | Self::UseRequestLocal(target_type)
34            | Self::AlterExistingTo(target_type) => target_type,
35        }
36    }
37
38    pub(super) fn requires_alter(self) -> bool {
39        matches!(self, Self::AlterExistingTo(_))
40    }
41}
42
43pub(super) struct PendingTraceColumnRewrite {
44    pub(super) col_idx: usize,
45    pub(super) target_type: ColumnDataType,
46    pub(super) column_name: String,
47}
48
49/// Picks the reconciliation action for one trace column.
50///
51/// Existing table schema is authoritative unless the only incompatible case is
52/// widening an existing Int64 column to Float64 for incoming Int64/Float64 data.
53pub(super) fn choose_trace_reconcile_decision(
54    observed_types: &[ColumnDataType],
55    existing_type: Option<ColumnDataType>,
56) -> ServerResult<Option<TraceReconcileDecision>> {
57    let Some(existing_type) = existing_type else {
58        return resolve_new_trace_column_type(observed_types.iter().copied())
59            .map(|target_type| target_type.map(TraceReconcileDecision::UseRequestLocal))
60            .map_err(|_| {
61                error::InvalidParameterSnafu {
62                    reason: "unsupported trace type mix".to_string(),
63                }
64                .build()
65            });
66    };
67
68    if observed_types.iter().all(|&request_type| {
69        request_type == existing_type || is_supported_trace_coercion(request_type, existing_type)
70    }) {
71        return Ok(Some(TraceReconcileDecision::UseExisting(existing_type)));
72    }
73
74    if existing_type == ColumnDataType::Int64
75        && observed_types.contains(&ColumnDataType::Float64)
76        && observed_types.iter().all(|observed_type| {
77            matches!(
78                observed_type,
79                ColumnDataType::Int64 | ColumnDataType::Float64
80            )
81        })
82    {
83        return Ok(Some(TraceReconcileDecision::AlterExistingTo(
84            ColumnDataType::Float64,
85        )));
86    }
87
88    error::InvalidParameterSnafu {
89        reason: "unsupported trace type mix".to_string(),
90    }
91    .fail()
92}
93
94/// Validate all pending trace column rewrites before any schema mutation happens.
95pub(super) fn validate_trace_column_rewrites(
96    rows: &[Row],
97    pending_rewrites: &[PendingTraceColumnRewrite],
98    table_name: &str,
99) -> ServerResult<()> {
100    for row in rows {
101        for pending_rewrite in pending_rewrites {
102            let Some(value) = row.values.get(pending_rewrite.col_idx) else {
103                continue;
104            };
105            let Some(request_type) = value.value_data.as_ref().and_then(trace_value_datatype)
106            else {
107                continue;
108            };
109            if request_type == pending_rewrite.target_type {
110                continue;
111            }
112
113            coerce_value_data(&value.value_data, pending_rewrite.target_type, request_type)
114                .map_err(|_| {
115                    error::InvalidParameterSnafu {
116                        reason: format!(
117                            "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
118                            pending_rewrite.column_name,
119                            table_name,
120                            request_type,
121                            pending_rewrite.target_type
122                        ),
123                    }
124                    .build()
125                })?;
126        }
127    }
128
129    Ok(())
130}
131
132pub(super) fn enrich_trace_reconcile_error(
133    table_name: &str,
134    column_name: &str,
135    observed_types: &[ColumnDataType],
136    existing_type: Option<ColumnDataType>,
137) -> servers::error::Error {
138    let observed_types = observed_types
139        .iter()
140        .map(|datatype| format!("{datatype:?}"))
141        .collect::<Vec<_>>()
142        .join(", ");
143
144    error::InvalidParameterSnafu {
145        reason: match existing_type {
146            Some(existing_type) => format!(
147                "failed to reconcile trace column '{}' in table '{}' with observed types [{}] against existing {:?}",
148                column_name, table_name, observed_types, existing_type
149            ),
150            None => format!(
151                "failed to reconcile trace column '{}' in table '{}' with observed types [{}]",
152                column_name, table_name, observed_types
153            ),
154        },
155    }
156    .build()
157}
158
159/// Only these trace scalar types participate in reconciliation. Other column kinds
160/// such as JSON and binary keep their original write path and schema checks.
161pub(super) fn is_trace_reconcile_candidate_type(datatype: ColumnDataType) -> bool {
162    matches!(
163        datatype,
164        ColumnDataType::String
165            | ColumnDataType::Boolean
166            | ColumnDataType::Int64
167            | ColumnDataType::Float64
168    )
169}
170
171/// Keeps the observed type list small without depending on enum ordering.
172pub(super) fn push_observed_trace_type(
173    observed_types: &mut Vec<ColumnDataType>,
174    datatype: ColumnDataType,
175) {
176    if !observed_types.contains(&datatype) {
177        observed_types.push(datatype);
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use api::v1::value::ValueData;
184    use api::v1::{ColumnDataType, Row, Value};
185    use common_error::ext::ErrorExt;
186    use common_error::status_code::StatusCode;
187
188    use super::{
189        PendingTraceColumnRewrite, TraceReconcileDecision, choose_trace_reconcile_decision,
190        enrich_trace_reconcile_error, is_trace_reconcile_candidate_type, push_observed_trace_type,
191        validate_trace_column_rewrites,
192    };
193
194    #[test]
195    fn test_choose_trace_reconcile_decision_existing_int64_keeps_int64() {
196        assert_eq!(
197            choose_trace_reconcile_decision(&[ColumnDataType::Int64], Some(ColumnDataType::Int64))
198                .unwrap(),
199            Some(TraceReconcileDecision::UseExisting(ColumnDataType::Int64))
200        );
201    }
202
203    #[test]
204    fn test_choose_trace_reconcile_decision_existing_int64_widens_to_float64() {
205        assert_eq!(
206            choose_trace_reconcile_decision(
207                &[ColumnDataType::Int64, ColumnDataType::Float64],
208                Some(ColumnDataType::Int64)
209            )
210            .unwrap(),
211            Some(TraceReconcileDecision::AlterExistingTo(
212                ColumnDataType::Float64
213            ))
214        );
215    }
216
217    #[test]
218    fn test_choose_trace_reconcile_decision_existing_float64_stays_authoritative() {
219        assert_eq!(
220            choose_trace_reconcile_decision(
221                &[ColumnDataType::Int64, ColumnDataType::Float64],
222                Some(ColumnDataType::Float64)
223            )
224            .unwrap(),
225            Some(TraceReconcileDecision::UseExisting(ColumnDataType::Float64))
226        );
227    }
228
229    #[test]
230    fn test_choose_trace_reconcile_decision_existing_int64_with_boolean_is_error() {
231        let err = choose_trace_reconcile_decision(
232            &[ColumnDataType::Boolean, ColumnDataType::Int64],
233            Some(ColumnDataType::Int64),
234        )
235        .unwrap_err();
236        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
237    }
238
239    #[test]
240    fn test_choose_trace_reconcile_decision_request_local_prefers_float64() {
241        assert_eq!(
242            choose_trace_reconcile_decision(
243                &[ColumnDataType::Int64, ColumnDataType::Float64],
244                None
245            )
246            .unwrap(),
247            Some(TraceReconcileDecision::UseRequestLocal(
248                ColumnDataType::Float64
249            ))
250        );
251    }
252
253    #[test]
254    fn test_validate_trace_column_rewrites_rejects_invalid_string_parse() {
255        let rows = vec![Row {
256            values: vec![Value {
257                value_data: Some(ValueData::StringValue("not_a_number".to_string())),
258            }],
259        }];
260        let pending_rewrites = vec![PendingTraceColumnRewrite {
261            col_idx: 0,
262            target_type: ColumnDataType::Int64,
263            column_name: "span_attributes.attr_int".to_string(),
264        }];
265
266        let err = validate_trace_column_rewrites(&rows, &pending_rewrites, "trace_type_atomicity")
267            .unwrap_err();
268        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
269    }
270
271    #[test]
272    fn test_enrich_trace_reconcile_error_includes_existing_type() {
273        let err = enrich_trace_reconcile_error(
274            "trace_type_atomicity",
275            "span_attributes.attr_int",
276            &[ColumnDataType::String, ColumnDataType::Int64],
277            Some(ColumnDataType::Boolean),
278        );
279
280        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
281        assert!(err.to_string().contains("span_attributes.attr_int"));
282        assert!(err.to_string().contains("Boolean"));
283    }
284
285    #[test]
286    fn test_is_trace_reconcile_candidate_type_filters_non_scalar_types() {
287        assert!(is_trace_reconcile_candidate_type(ColumnDataType::String));
288        assert!(is_trace_reconcile_candidate_type(ColumnDataType::Boolean));
289        assert!(!is_trace_reconcile_candidate_type(ColumnDataType::Binary));
290        assert!(!is_trace_reconcile_candidate_type(
291            ColumnDataType::TimestampMillisecond
292        ));
293    }
294
295    #[test]
296    fn test_push_observed_trace_type_deduplicates_types() {
297        let mut observed_types = Vec::new();
298
299        push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
300        push_observed_trace_type(&mut observed_types, ColumnDataType::Int64);
301        push_observed_trace_type(&mut observed_types, ColumnDataType::Float64);
302
303        assert_eq!(
304            observed_types,
305            vec![ColumnDataType::Int64, ColumnDataType::Float64]
306        );
307    }
308}