Skip to main content

frontend/instance/
otlp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::helper::ColumnDataTypeWrapper;
18use api::v1::alter_table_expr::Kind;
19use api::v1::{
20    AlterTableExpr, ColumnDataType, ModifyColumnType, ModifyColumnTypes, RowInsertRequests,
21};
22use async_trait::async_trait;
23use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
24use client::Output;
25use common_error::ext::{BoxedError, ErrorExt};
26use common_error::status_code::StatusCode;
27use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
28use common_telemetry::{tracing, warn};
29use itertools::Itertools;
30use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
31use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
32use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
33use pipeline::{GreptimePipelineParams, PipelineWay};
34use servers::error::{self, AuthSnafu, Result as ServerResult};
35use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
36use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
37use servers::otlp;
38use servers::otlp::trace::TraceAuxData;
39use servers::otlp::trace::coerce::{coerce_value_data, trace_value_datatype};
40use servers::otlp::trace::span::{TraceSpan, TraceSpanGroup};
41use servers::query_handler::{
42    OpenTelemetryProtocolHandler, PipelineHandlerRef, TraceIngestOutcome,
43};
44use session::context::QueryContextRef;
45use snafu::{IntoError, ResultExt};
46use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
47
48use crate::instance::Instance;
49use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type;
50use crate::instance::otlp::trace_types::{
51    PendingTraceColumnRewrite, choose_trace_reconcile_decision, enrich_trace_reconcile_error,
52    is_trace_reconcile_candidate_type, push_observed_trace_type, validate_trace_column_rewrites,
53};
54use crate::metrics::{
55    OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS,
56};
57
58pub mod trace_semconv;
59pub mod trace_types;
60
61const TRACE_INGEST_CHUNK_SIZE: usize = 64;
62const TRACE_FAILURE_MESSAGE_LIMIT: usize = 4;
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65enum ChunkFailureReaction {
66    RetryPerSpan,
67    DiscardChunk,
68    Propagate,
69}
70
71impl ChunkFailureReaction {
72    fn as_metric_label(self) -> &'static str {
73        match self {
74            Self::RetryPerSpan => "retry_per_span",
75            Self::DiscardChunk => "discard_chunk",
76            Self::Propagate => "propagate_failure",
77        }
78    }
79}
80
81struct TraceChunkIngestContext<'a> {
82    pipeline_handler: PipelineHandlerRef,
83    pipeline: &'a PipelineWay,
84    pipeline_params: &'a GreptimePipelineParams,
85    table_name: &'a str,
86    is_trace_v1_model: bool,
87}
88
89struct TraceIngestState {
90    aux_data: TraceAuxData,
91    outcome: TraceIngestOutcome,
92    failure_messages: Vec<String>,
93}
94
95#[async_trait]
96impl OpenTelemetryProtocolHandler for Instance {
97    #[tracing::instrument(skip_all)]
98    async fn metrics(
99        &self,
100        request: ExportMetricsServiceRequest,
101        ctx: QueryContextRef,
102    ) -> ServerResult<Output> {
103        self.plugins
104            .get::<PermissionCheckerRef>()
105            .as_ref()
106            .check_permission(ctx.current_user(), PermissionReq::Otlp)
107            .context(AuthSnafu)?;
108
109        let interceptor_ref = self
110            .plugins
111            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
112        interceptor_ref.pre_execute(ctx.clone())?;
113
114        let input_names = request
115            .resource_metrics
116            .iter()
117            .flat_map(|r| r.scope_metrics.iter())
118            .flat_map(|s| s.metrics.iter().map(|m| &m.name))
119            .collect::<Vec<_>>();
120
121        // See [`OtlpMetricCtx`] for details
122        let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
123
124        let mut metric_ctx = ctx
125            .protocol_ctx()
126            .get_otlp_metric_ctx()
127            .cloned()
128            .unwrap_or_default();
129        metric_ctx.is_legacy = is_legacy;
130
131        let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
132        OTLP_METRICS_ROWS.inc_by(rows as u64);
133
134        let ctx = if !is_legacy {
135            let mut c = (*ctx).clone();
136            c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
137            Arc::new(c)
138        } else {
139            ctx
140        };
141
142        // If the user uses the legacy path, it is by default without metric engine.
143        if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
144            self.handle_row_inserts(requests, ctx, false, false)
145                .await
146                .map_err(BoxedError::new)
147                .context(error::ExecuteGrpcQuerySnafu)
148        } else {
149            let physical_table = ctx
150                .extension(PHYSICAL_TABLE_PARAM)
151                .unwrap_or(GREPTIME_PHYSICAL_TABLE)
152                .to_string();
153            self.handle_metric_row_inserts(requests, ctx, physical_table.clone())
154                .await
155                .map_err(BoxedError::new)
156                .context(error::ExecuteGrpcQuerySnafu)
157        }
158    }
159
160    #[tracing::instrument(skip_all)]
161    async fn traces(
162        &self,
163        pipeline_handler: PipelineHandlerRef,
164        request: ExportTraceServiceRequest,
165        pipeline: PipelineWay,
166        pipeline_params: GreptimePipelineParams,
167        table_name: String,
168        ctx: QueryContextRef,
169    ) -> ServerResult<TraceIngestOutcome> {
170        self.plugins
171            .get::<PermissionCheckerRef>()
172            .as_ref()
173            .check_permission(ctx.current_user(), PermissionReq::Otlp)
174            .context(AuthSnafu)?;
175
176        let interceptor_ref = self
177            .plugins
178            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
179        interceptor_ref.pre_execute(ctx.clone())?;
180
181        let spans = otlp::trace::span::parse(request);
182        self.ingest_trace_spans(
183            pipeline_handler,
184            &pipeline,
185            &pipeline_params,
186            table_name,
187            spans,
188            ctx,
189        )
190        .await
191    }
192
193    #[tracing::instrument(skip_all)]
194    async fn logs(
195        &self,
196        pipeline_handler: PipelineHandlerRef,
197        request: ExportLogsServiceRequest,
198        pipeline: PipelineWay,
199        pipeline_params: GreptimePipelineParams,
200        table_name: String,
201        ctx: QueryContextRef,
202    ) -> ServerResult<Vec<Output>> {
203        self.plugins
204            .get::<PermissionCheckerRef>()
205            .as_ref()
206            .check_permission(ctx.current_user(), PermissionReq::Otlp)
207            .context(AuthSnafu)?;
208
209        let interceptor_ref = self
210            .plugins
211            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
212        interceptor_ref.pre_execute(ctx.clone())?;
213
214        let opt_req = otlp::logs::to_grpc_insert_requests(
215            request,
216            pipeline,
217            pipeline_params,
218            table_name,
219            &ctx,
220            pipeline_handler,
221        )
222        .await?;
223
224        let mut outputs = vec![];
225
226        for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
227            let cnt = requests
228                .inserts
229                .iter()
230                .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
231                .sum::<usize>();
232
233            let o = self
234                .handle_log_inserts(requests, temp_ctx)
235                .await
236                .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
237                .map_err(BoxedError::new)
238                .context(error::ExecuteGrpcQuerySnafu)?;
239            outputs.push(o);
240        }
241
242        Ok(outputs)
243    }
244}
245
246impl Instance {
247    /// Ingest OTLP trace spans with chunk-level writes and span-level fallback on
248    /// deterministic chunk failures.
249    async fn ingest_trace_spans(
250        &self,
251        pipeline_handler: PipelineHandlerRef,
252        pipeline: &PipelineWay,
253        pipeline_params: &GreptimePipelineParams,
254        table_name: String,
255        groups: Vec<TraceSpanGroup>,
256        ctx: QueryContextRef,
257    ) -> ServerResult<TraceIngestOutcome> {
258        let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
259        let ingest_ctx = TraceChunkIngestContext {
260            pipeline_handler,
261            pipeline,
262            pipeline_params,
263            table_name: &table_name,
264            is_trace_v1_model,
265        };
266        let mut ingest_state = TraceIngestState {
267            aux_data: TraceAuxData::default(),
268            outcome: TraceIngestOutcome::default(),
269            failure_messages: Vec::new(),
270        };
271
272        for group in groups {
273            let chunks = group
274                .spans
275                .into_iter()
276                .chunks(TRACE_INGEST_CHUNK_SIZE)
277                .into_iter()
278                .map(|chunk| chunk.collect::<Vec<_>>())
279                .collect::<Vec<_>>();
280            for chunk in chunks {
281                self.ingest_trace_chunk(&ingest_ctx, chunk, ctx.clone(), &mut ingest_state)
282                    .await?;
283            }
284        }
285
286        OTLP_TRACES_ROWS.inc_by(ingest_state.outcome.accepted_spans as u64);
287
288        if !ingest_state.aux_data.is_empty() {
289            // Auxiliary trace tables are derived from spans whose main-table
290            // writes are already confirmed, so they never create new accepted
291            // spans and they do not affect rejected span counts.
292            let (aux_requests, _) = otlp::trace::to_grpc_insert_requests_for_aux_tables(
293                std::mem::take(&mut ingest_state.aux_data),
294                ingest_ctx.pipeline,
295                ingest_ctx.table_name,
296            )?;
297
298            if !aux_requests.inserts.is_empty() {
299                match self
300                    .insert_trace_requests(aux_requests, ingest_ctx.is_trace_v1_model, ctx)
301                    .await
302                {
303                    Ok(output) => {
304                        Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
305                    }
306                    Err(err) => {
307                        Self::push_trace_failure_message(
308                            &mut ingest_state.failure_messages,
309                            "aux_table_update_failed",
310                            format!(
311                                "Auxiliary trace tables were not fully updated ({})",
312                                err.status_code().as_ref()
313                            ),
314                        );
315                    }
316                }
317            }
318        }
319
320        ingest_state.outcome.error_message = Self::finish_trace_failure_message(
321            ingest_state.outcome.accepted_spans,
322            ingest_state.outcome.rejected_spans,
323            ingest_state.failure_messages,
324        );
325
326        Ok(ingest_state.outcome)
327    }
328
329    /// Ingest one owned trace chunk so successful spans can be moved into the
330    /// accepted set without extra cloning.
331    async fn ingest_trace_chunk(
332        &self,
333        ingest_ctx: &TraceChunkIngestContext<'_>,
334        chunk: Vec<TraceSpan>,
335        ctx: QueryContextRef,
336        ingest_state: &mut TraceIngestState,
337    ) -> ServerResult<()> {
338        // Try the fast path first so healthy batches keep their original
339        // throughput and write amplification stays low.
340        let (requests, chunk_rows) = otlp::trace::to_grpc_insert_requests_from_spans(
341            &chunk,
342            ingest_ctx.pipeline,
343            ingest_ctx.pipeline_params,
344            ingest_ctx.table_name,
345            &ctx,
346            ingest_ctx.pipeline_handler.clone(),
347        )?;
348
349        match self
350            .insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone())
351            .await
352        {
353            Ok(output) => {
354                Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
355                ingest_state.outcome.accepted_spans += chunk_rows;
356                for span in &chunk {
357                    ingest_state.aux_data.observe_span(span);
358                }
359            }
360            Err(err) => match Self::classify_trace_chunk_failure(err.status_code()) {
361                ChunkFailureReaction::RetryPerSpan => {
362                    Self::push_trace_failure_message(
363                        &mut ingest_state.failure_messages,
364                        ChunkFailureReaction::RetryPerSpan.as_metric_label(),
365                        format!("Chunk fallback triggered by {}", err.status_code().as_ref()),
366                    );
367                    // Only deterministic failures are retried span by span.
368                    // This includes schemaless table or column creation paths for
369                    // trace ingestion. Ambiguous failures are handled below
370                    // without retrying because the chunk may already have been
371                    // ingested.
372                    self.ingest_trace_chunk_span_by_span(
373                        ingest_ctx,
374                        chunk,
375                        ctx.clone(),
376                        ingest_state,
377                    )
378                    .await?;
379                }
380                ChunkFailureReaction::DiscardChunk => {
381                    ingest_state.outcome.rejected_spans += chunk.len();
382                    Self::push_trace_failure_message(
383                        &mut ingest_state.failure_messages,
384                        ChunkFailureReaction::DiscardChunk.as_metric_label(),
385                        format!(
386                            "Discarded {} spans after ambiguous chunk failure ({})",
387                            chunk.len(),
388                            err.status_code().as_ref()
389                        ),
390                    );
391                    // TODO(shuiyisong): Add an idempotent retry-safe recovery path for
392                    // ambiguous chunk failures such as timeout-like errors.
393                }
394                // Retryable or ambiguous failures must fail the request instead of
395                // becoming partial success. This path is not retry-safe because the
396                // chunk may already have been committed before the error surfaced.
397                ChunkFailureReaction::Propagate => {
398                    Self::push_trace_failure_message(
399                        &mut ingest_state.failure_messages,
400                        ChunkFailureReaction::Propagate.as_metric_label(),
401                        format!(
402                            "Propagating retryable chunk failure ({})",
403                            err.status_code().as_ref()
404                        ),
405                    );
406                    return Err(err);
407                }
408            },
409        }
410
411        Ok(())
412    }
413
414    /// Retry spans one by one only after a deterministic chunk failure.
415    async fn ingest_trace_chunk_span_by_span(
416        &self,
417        ingest_ctx: &TraceChunkIngestContext<'_>,
418        chunk: Vec<TraceSpan>,
419        ctx: QueryContextRef,
420        ingest_state: &mut TraceIngestState,
421    ) -> ServerResult<()> {
422        for span in chunk {
423            let (requests, rows) = otlp::trace::to_grpc_insert_requests_from_spans(
424                std::slice::from_ref(&span),
425                ingest_ctx.pipeline,
426                ingest_ctx.pipeline_params,
427                ingest_ctx.table_name,
428                &ctx,
429                ingest_ctx.pipeline_handler.clone(),
430            )?;
431
432            match self
433                .insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone())
434                .await
435            {
436                Ok(output) => {
437                    Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
438                    ingest_state.outcome.accepted_spans += rows;
439                    ingest_state.aux_data.observe_span(&span);
440                }
441                Err(err) => {
442                    if Self::should_propagate_trace_span_failure(err.status_code()) {
443                        Self::push_trace_failure_message(
444                            &mut ingest_state.failure_messages,
445                            ChunkFailureReaction::Propagate.as_metric_label(),
446                            format!(
447                                "Propagating retryable span failure for {}:{} ({})",
448                                span.trace_id,
449                                span.span_id,
450                                err.status_code().as_ref()
451                            ),
452                        );
453                        return Err(err);
454                    }
455
456                    ingest_state.outcome.rejected_spans += 1;
457                    Self::push_trace_failure_message(
458                        &mut ingest_state.failure_messages,
459                        "span_rejected",
460                        format!(
461                            "Rejected span {}:{} ({})",
462                            span.trace_id,
463                            span.span_id,
464                            err.status_code().as_ref()
465                        ),
466                    );
467                }
468            }
469        }
470
471        Ok(())
472    }
473
474    /// Reconcile and insert one trace request batch.
475    async fn insert_trace_requests(
476        &self,
477        mut requests: RowInsertRequests,
478        is_trace_v1_model: bool,
479        ctx: QueryContextRef,
480    ) -> ServerResult<Output> {
481        if is_trace_v1_model {
482            self.reconcile_trace_column_types(&mut requests, &ctx)
483                .await?;
484            self.handle_trace_inserts(requests, ctx)
485                .await
486                .map_err(BoxedError::new)
487                .context(error::ExecuteGrpcQuerySnafu)
488        } else {
489            self.handle_log_inserts(requests, ctx)
490                .await
491                .map_err(BoxedError::new)
492                .context(error::ExecuteGrpcQuerySnafu)
493        }
494    }
495
496    fn classify_trace_chunk_failure(status: StatusCode) -> ChunkFailureReaction {
497        match status {
498            StatusCode::InvalidArguments
499            | StatusCode::InvalidSyntax
500            | StatusCode::Unsupported
501            | StatusCode::TableNotFound
502            | StatusCode::TableColumnNotFound => ChunkFailureReaction::RetryPerSpan,
503            StatusCode::DatabaseNotFound => ChunkFailureReaction::DiscardChunk,
504            StatusCode::Cancelled | StatusCode::DeadlineExceeded => ChunkFailureReaction::Propagate,
505            _ if status.is_retryable() => ChunkFailureReaction::Propagate,
506            _ => ChunkFailureReaction::DiscardChunk,
507        }
508    }
509
510    fn should_propagate_trace_span_failure(status: StatusCode) -> bool {
511        matches!(
512            Self::classify_trace_chunk_failure(status),
513            ChunkFailureReaction::Propagate
514        )
515    }
516
517    fn add_trace_write_cost(outcome: &mut TraceIngestOutcome, cost: usize) {
518        outcome.write_cost += cost;
519    }
520
521    fn push_trace_failure_message(messages: &mut Vec<String>, label: &str, message: String) {
522        OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).inc();
523
524        if messages.len() < TRACE_FAILURE_MESSAGE_LIMIT {
525            messages.push(message);
526        } else if messages.len() == TRACE_FAILURE_MESSAGE_LIMIT {
527            tracing::debug!(
528                label,
529                limit = TRACE_FAILURE_MESSAGE_LIMIT,
530                "Trace ingest failure message limit reached; suppressing additional failure details"
531            );
532        }
533    }
534
535    fn finish_trace_failure_message(
536        accepted_spans: usize,
537        rejected_spans: usize,
538        messages: Vec<String>,
539    ) -> Option<String> {
540        if rejected_spans == 0 && messages.is_empty() {
541            return None;
542        }
543
544        let mut summary = format!(
545            "Accepted {} spans, rejected {} spans",
546            accepted_spans, rejected_spans
547        );
548
549        if !messages.is_empty() {
550            summary.push_str(": ");
551            summary.push_str(&messages.join("; "));
552        }
553
554        Some(summary)
555    }
556
557    /// Widen existing trace table columns to Float64 before request rewrite.
558    async fn alter_trace_table_columns_to_float64(
559        &self,
560        ctx: &QueryContextRef,
561        table_name: &str,
562        column_names: &[String],
563    ) -> ServerResult<()> {
564        let catalog_name = ctx.current_catalog().to_string();
565        let schema_name = ctx.current_schema();
566        let alter_expr = AlterTableExpr {
567            catalog_name: catalog_name.clone(),
568            schema_name: schema_name.clone(),
569            table_name: table_name.to_string(),
570            kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
571                modify_column_types: column_names
572                    .iter()
573                    .map(|column_name| ModifyColumnType {
574                        column_name: column_name.clone(),
575                        target_type: ColumnDataType::Float64 as i32,
576                        target_type_extension: None,
577                    })
578                    .collect(),
579            })),
580        };
581
582        if let Err(err) = self
583            .statement_executor
584            .alter_table_inner(alter_expr, ctx.clone())
585            .await
586        {
587            let table = self
588                .catalog_manager
589                .table(&catalog_name, &schema_name, table_name, None)
590                .await
591                .map_err(servers::error::Error::from)?;
592            let alter_already_applied = table
593                .map(|table| {
594                    let table_schema = table.schema();
595                    column_names.iter().all(|column_name| {
596                        table_schema
597                            .column_schema_by_name(column_name)
598                            .and_then(|table_col| {
599                                ColumnDataTypeWrapper::try_from(table_col.data_type.clone())
600                                    .ok()
601                                    .map(|wrapper| wrapper.datatype())
602                            })
603                            == Some(ColumnDataType::Float64)
604                    })
605                })
606                .unwrap_or(false);
607
608            if alter_already_applied {
609                return Ok(());
610            }
611
612            warn!(
613                table_name,
614                columns = ?column_names,
615                error = %err,
616                "failed to widen trace columns before insert"
617            );
618
619            return Err(wrap_trace_alter_failure(err));
620        }
621
622        Ok(())
623    }
624
625    /// Coerce request column types and values to match the existing table schema
626    /// for compatible type pairs. Existing table schema wins when present;
627    /// otherwise the full request batch decides a stable target type.
628    async fn reconcile_trace_column_types(
629        &self,
630        requests: &mut RowInsertRequests,
631        ctx: &QueryContextRef,
632    ) -> ServerResult<()> {
633        let catalog = ctx.current_catalog();
634        let schema = ctx.current_schema();
635
636        for req in &mut requests.inserts {
637            let table = self
638                .catalog_manager
639                .table(catalog, &schema, &req.table_name, None)
640                .await?;
641
642            let Some(rows) = req.rows.as_mut() else {
643                continue;
644            };
645
646            let table_schema = table.map(|table| table.schema());
647            let mut pending_rewrites = Vec::new();
648            let mut pending_alter_columns = Vec::new();
649
650            for (col_idx, col_schema) in rows.schema.iter().enumerate() {
651                let Some(current_type) = ColumnDataType::try_from(col_schema.datatype).ok() else {
652                    continue;
653                };
654
655                let mut observed_types = Vec::new();
656                push_observed_trace_type(&mut observed_types, current_type);
657
658                // Scan the full request first so the final type decision is not affected
659                // by row order inside the batch.
660                for row in &rows.rows {
661                    let Some(value) = row
662                        .values
663                        .get(col_idx)
664                        .and_then(|value| value.value_data.as_ref())
665                    else {
666                        continue;
667                    };
668
669                    let Some(value_type) = trace_value_datatype(value) else {
670                        continue;
671                    };
672                    push_observed_trace_type(&mut observed_types, value_type);
673                }
674
675                let existing_type = table_schema
676                    .as_ref()
677                    .and_then(|schema| schema.column_schema_by_name(&col_schema.column_name))
678                    .and_then(|table_col| {
679                        ColumnDataTypeWrapper::try_from(table_col.data_type.clone())
680                            .ok()
681                            .map(|wrapper| wrapper.datatype())
682                    });
683                let fixed_type = trace_semconv_fixed_type(&col_schema.column_name);
684
685                if !observed_types
686                    .iter()
687                    .copied()
688                    .any(is_trace_reconcile_candidate_type)
689                    && existing_type
690                        .map(|datatype| !is_trace_reconcile_candidate_type(datatype))
691                        .unwrap_or(true)
692                    && fixed_type.is_none()
693                {
694                    continue;
695                }
696
697                // Decide the final type once per column, then rewrite all affected cells
698                // together in one row pass below.
699                let Some(decision) = choose_trace_reconcile_decision(
700                    &col_schema.column_name,
701                    &observed_types,
702                    existing_type,
703                )
704                .map_err(|_| {
705                    enrich_trace_reconcile_error(
706                        &req.table_name,
707                        &col_schema.column_name,
708                        &observed_types,
709                        existing_type,
710                        fixed_type,
711                    )
712                })?
713                else {
714                    continue;
715                };
716                let target_type = decision.target_type();
717
718                if !decision.requires_alter()
719                    && observed_types
720                        .iter()
721                        .all(|observed| *observed == target_type)
722                    && col_schema.datatype == target_type as i32
723                {
724                    continue;
725                }
726
727                if decision.requires_alter()
728                    && !pending_alter_columns.contains(&col_schema.column_name)
729                {
730                    pending_alter_columns.push(col_schema.column_name.clone());
731                }
732
733                pending_rewrites.push(PendingTraceColumnRewrite {
734                    col_idx,
735                    target_type,
736                    column_name: col_schema.column_name.clone(),
737                });
738            }
739
740            if pending_rewrites.is_empty() {
741                continue;
742            }
743
744            validate_trace_column_rewrites(&rows.rows, &pending_rewrites, &req.table_name)?;
745
746            if !pending_alter_columns.is_empty() {
747                self.alter_trace_table_columns_to_float64(
748                    ctx,
749                    &req.table_name,
750                    &pending_alter_columns,
751                )
752                .await?;
753            }
754
755            // Update schema metadata before mutating row values so both stay in sync.
756            for pending_rewrite in &pending_rewrites {
757                rows.schema[pending_rewrite.col_idx].datatype = pending_rewrite.target_type as i32;
758            }
759
760            // Apply all pending column rewrites in one row pass.
761            for row in &mut rows.rows {
762                for pending_rewrite in &pending_rewrites {
763                    let Some(value) = row.values.get_mut(pending_rewrite.col_idx) else {
764                        continue;
765                    };
766                    let Some(request_type) =
767                        value.value_data.as_ref().and_then(trace_value_datatype)
768                    else {
769                        continue;
770                    };
771                    if request_type == pending_rewrite.target_type {
772                        continue;
773                    }
774
775                    value.value_data = coerce_value_data(
776                        &value.value_data,
777                        pending_rewrite.target_type,
778                        request_type,
779                    )
780                    .map_err(|_| {
781                        error::InvalidParameterSnafu {
782                            reason: format!(
783                                "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
784                                pending_rewrite.column_name,
785                                req.table_name,
786                                request_type,
787                                pending_rewrite.target_type
788                            ),
789                        }
790                        .build()
791                    })?;
792                }
793            }
794        }
795
796        Ok(())
797    }
798}
799
800/// Preserve the original alter failure status so chunk retry behavior stays correct.
801fn wrap_trace_alter_failure<E>(err: E) -> servers::error::Error
802where
803    E: ErrorExt + Send + Sync + 'static,
804{
805    error::ExecuteGrpcQuerySnafu.into_error(BoxedError::new(err))
806}
807
808#[cfg(test)]
809mod tests {
810    use common_error::ext::ErrorExt;
811    use common_error::status_code::StatusCode;
812    use servers::query_handler::TraceIngestOutcome;
813
814    use super::{ChunkFailureReaction, Instance, wrap_trace_alter_failure};
815    use crate::metrics::OTLP_TRACES_FAILURE_COUNT;
816
817    #[test]
818    fn test_classify_trace_chunk_failure() {
819        assert_eq!(
820            Instance::classify_trace_chunk_failure(StatusCode::InvalidArguments),
821            ChunkFailureReaction::RetryPerSpan
822        );
823        assert_eq!(
824            Instance::classify_trace_chunk_failure(StatusCode::InvalidSyntax),
825            ChunkFailureReaction::RetryPerSpan
826        );
827        assert_eq!(
828            Instance::classify_trace_chunk_failure(StatusCode::Unsupported),
829            ChunkFailureReaction::RetryPerSpan
830        );
831        assert_eq!(
832            Instance::classify_trace_chunk_failure(StatusCode::TableColumnNotFound),
833            ChunkFailureReaction::RetryPerSpan
834        );
835        assert_eq!(
836            Instance::classify_trace_chunk_failure(StatusCode::TableNotFound),
837            ChunkFailureReaction::RetryPerSpan
838        );
839        assert_eq!(
840            Instance::classify_trace_chunk_failure(StatusCode::DatabaseNotFound),
841            ChunkFailureReaction::DiscardChunk
842        );
843        assert_eq!(
844            Instance::classify_trace_chunk_failure(StatusCode::DeadlineExceeded),
845            ChunkFailureReaction::Propagate
846        );
847        assert_eq!(
848            Instance::classify_trace_chunk_failure(StatusCode::Cancelled),
849            ChunkFailureReaction::Propagate
850        );
851        assert_eq!(
852            Instance::classify_trace_chunk_failure(StatusCode::StorageUnavailable),
853            ChunkFailureReaction::Propagate
854        );
855        assert_eq!(
856            Instance::classify_trace_chunk_failure(StatusCode::Internal),
857            ChunkFailureReaction::Propagate
858        );
859        assert_eq!(
860            Instance::classify_trace_chunk_failure(StatusCode::RegionNotReady),
861            ChunkFailureReaction::Propagate
862        );
863        assert_eq!(
864            Instance::classify_trace_chunk_failure(StatusCode::TableUnavailable),
865            ChunkFailureReaction::Propagate
866        );
867        assert_eq!(
868            Instance::classify_trace_chunk_failure(StatusCode::RegionBusy),
869            ChunkFailureReaction::Propagate
870        );
871        assert_eq!(
872            Instance::classify_trace_chunk_failure(StatusCode::RuntimeResourcesExhausted),
873            ChunkFailureReaction::Propagate
874        );
875    }
876
877    #[test]
878    fn test_classify_trace_span_failure() {
879        assert!(Instance::should_propagate_trace_span_failure(
880            StatusCode::DeadlineExceeded
881        ));
882        assert!(Instance::should_propagate_trace_span_failure(
883            StatusCode::StorageUnavailable
884        ));
885        assert!(!Instance::should_propagate_trace_span_failure(
886            StatusCode::InvalidArguments
887        ));
888    }
889
890    #[test]
891    fn test_add_trace_write_cost() {
892        let mut outcome = TraceIngestOutcome::default();
893        Instance::add_trace_write_cost(&mut outcome, 3);
894        Instance::add_trace_write_cost(&mut outcome, 5);
895        assert_eq!(outcome.write_cost, 8);
896    }
897
898    #[test]
899    fn test_finish_trace_failure_message() {
900        let message = Instance::finish_trace_failure_message(
901            3,
902            2,
903            vec!["Rejected span trace:span (InvalidArguments)".to_string()],
904        )
905        .unwrap();
906        assert!(message.contains("Accepted 3 spans, rejected 2 spans"));
907        assert!(message.contains("Rejected span trace:span"));
908
909        assert_eq!(Instance::finish_trace_failure_message(2, 0, vec![]), None);
910    }
911
912    #[test]
913    fn test_finish_trace_failure_message_without_detail_messages() {
914        assert_eq!(
915            Instance::finish_trace_failure_message(0, 2, vec![]),
916            Some("Accepted 0 spans, rejected 2 spans".to_string())
917        );
918    }
919
920    #[test]
921    fn test_push_trace_failure_message_increments_labeled_counter() {
922        let label = "retry_per_span_counter_test";
923        let initial = OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get();
924        let mut messages = Vec::new();
925
926        Instance::push_trace_failure_message(
927            &mut messages,
928            label,
929            "Chunk fallback triggered by InvalidArguments".to_string(),
930        );
931
932        assert_eq!(messages.len(), 1);
933        assert_eq!(
934            OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get(),
935            initial + 1
936        );
937    }
938
939    #[test]
940    fn test_push_trace_failure_message_caps_recorded_messages() {
941        let label = "retry_per_span_limit_test";
942        let mut messages = Vec::new();
943
944        for idx in 0..=4 {
945            Instance::push_trace_failure_message(&mut messages, label, format!("failure-{idx}"));
946        }
947
948        assert_eq!(messages.len(), 4);
949        assert_eq!(
950            messages,
951            vec![
952                "failure-0".to_string(),
953                "failure-1".to_string(),
954                "failure-2".to_string(),
955                "failure-3".to_string()
956            ]
957        );
958    }
959
960    #[test]
961    fn test_classify_trace_chunk_failure_defaults_to_discard() {
962        assert_eq!(
963            Instance::classify_trace_chunk_failure(StatusCode::Unknown),
964            ChunkFailureReaction::DiscardChunk
965        );
966    }
967
968    #[test]
969    fn test_wrap_trace_alter_failure_preserves_status_code() {
970        let err = wrap_trace_alter_failure(
971            servers::error::TableNotFoundSnafu {
972                catalog: "greptime".to_string(),
973                schema: "public".to_string(),
974                table: "trace_type_missing".to_string(),
975            }
976            .build(),
977        );
978
979        assert_eq!(err.status_code(), StatusCode::TableNotFound);
980    }
981}