1use std::sync::Arc;
16
17use api::helper::ColumnDataTypeWrapper;
18use api::v1::alter_table_expr::Kind;
19use api::v1::{
20 AlterTableExpr, ColumnDataType, ModifyColumnType, ModifyColumnTypes, RowInsertRequests,
21};
22use async_trait::async_trait;
23use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
24use client::Output;
25use common_error::ext::{BoxedError, ErrorExt};
26use common_error::status_code::StatusCode;
27use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
28use common_telemetry::{tracing, warn};
29use itertools::Itertools;
30use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
31use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
32use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
33use pipeline::{GreptimePipelineParams, PipelineWay};
34use servers::error::{self, AuthSnafu, Result as ServerResult};
35use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
36use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
37use servers::otlp;
38use servers::otlp::trace::TraceAuxData;
39use servers::otlp::trace::coerce::{coerce_value_data, trace_value_datatype};
40use servers::otlp::trace::span::{TraceSpan, TraceSpanGroup};
41use servers::query_handler::{
42 OpenTelemetryProtocolHandler, PipelineHandlerRef, TraceIngestOutcome,
43};
44use session::context::QueryContextRef;
45use snafu::{IntoError, ResultExt};
46use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
47
48use crate::instance::Instance;
49use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type;
50use crate::instance::otlp::trace_types::{
51 PendingTraceColumnRewrite, choose_trace_reconcile_decision, enrich_trace_reconcile_error,
52 is_trace_reconcile_candidate_type, push_observed_trace_type, validate_trace_column_rewrites,
53};
54use crate::metrics::{
55 OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_FAILURE_COUNT, OTLP_TRACES_ROWS,
56};
57
58pub mod trace_semconv;
59pub mod trace_types;
60
61const TRACE_INGEST_CHUNK_SIZE: usize = 64;
62const TRACE_FAILURE_MESSAGE_LIMIT: usize = 4;
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65enum ChunkFailureReaction {
66 RetryPerSpan,
67 DiscardChunk,
68 Propagate,
69}
70
71impl ChunkFailureReaction {
72 fn as_metric_label(self) -> &'static str {
73 match self {
74 Self::RetryPerSpan => "retry_per_span",
75 Self::DiscardChunk => "discard_chunk",
76 Self::Propagate => "propagate_failure",
77 }
78 }
79}
80
81struct TraceChunkIngestContext<'a> {
82 pipeline_handler: PipelineHandlerRef,
83 pipeline: &'a PipelineWay,
84 pipeline_params: &'a GreptimePipelineParams,
85 table_name: &'a str,
86 is_trace_v1_model: bool,
87}
88
89struct TraceIngestState {
90 aux_data: TraceAuxData,
91 outcome: TraceIngestOutcome,
92 failure_messages: Vec<String>,
93}
94
95#[async_trait]
96impl OpenTelemetryProtocolHandler for Instance {
97 #[tracing::instrument(skip_all)]
98 async fn metrics(
99 &self,
100 request: ExportMetricsServiceRequest,
101 ctx: QueryContextRef,
102 ) -> ServerResult<Output> {
103 self.plugins
104 .get::<PermissionCheckerRef>()
105 .as_ref()
106 .check_permission(ctx.current_user(), PermissionReq::Otlp)
107 .context(AuthSnafu)?;
108
109 let interceptor_ref = self
110 .plugins
111 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
112 interceptor_ref.pre_execute(ctx.clone())?;
113
114 let input_names = request
115 .resource_metrics
116 .iter()
117 .flat_map(|r| r.scope_metrics.iter())
118 .flat_map(|s| s.metrics.iter().map(|m| &m.name))
119 .collect::<Vec<_>>();
120
121 let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
123
124 let mut metric_ctx = ctx
125 .protocol_ctx()
126 .get_otlp_metric_ctx()
127 .cloned()
128 .unwrap_or_default();
129 metric_ctx.is_legacy = is_legacy;
130
131 let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
132 OTLP_METRICS_ROWS.inc_by(rows as u64);
133
134 let ctx = if !is_legacy {
135 let mut c = (*ctx).clone();
136 c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
137 Arc::new(c)
138 } else {
139 ctx
140 };
141
142 if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
144 self.handle_row_inserts(requests, ctx, false, false)
145 .await
146 .map_err(BoxedError::new)
147 .context(error::ExecuteGrpcQuerySnafu)
148 } else {
149 let physical_table = ctx
150 .extension(PHYSICAL_TABLE_PARAM)
151 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
152 .to_string();
153 self.handle_metric_row_inserts(requests, ctx, physical_table.clone())
154 .await
155 .map_err(BoxedError::new)
156 .context(error::ExecuteGrpcQuerySnafu)
157 }
158 }
159
160 #[tracing::instrument(skip_all)]
161 async fn traces(
162 &self,
163 pipeline_handler: PipelineHandlerRef,
164 request: ExportTraceServiceRequest,
165 pipeline: PipelineWay,
166 pipeline_params: GreptimePipelineParams,
167 table_name: String,
168 ctx: QueryContextRef,
169 ) -> ServerResult<TraceIngestOutcome> {
170 self.plugins
171 .get::<PermissionCheckerRef>()
172 .as_ref()
173 .check_permission(ctx.current_user(), PermissionReq::Otlp)
174 .context(AuthSnafu)?;
175
176 let interceptor_ref = self
177 .plugins
178 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
179 interceptor_ref.pre_execute(ctx.clone())?;
180
181 let spans = otlp::trace::span::parse(request);
182 self.ingest_trace_spans(
183 pipeline_handler,
184 &pipeline,
185 &pipeline_params,
186 table_name,
187 spans,
188 ctx,
189 )
190 .await
191 }
192
193 #[tracing::instrument(skip_all)]
194 async fn logs(
195 &self,
196 pipeline_handler: PipelineHandlerRef,
197 request: ExportLogsServiceRequest,
198 pipeline: PipelineWay,
199 pipeline_params: GreptimePipelineParams,
200 table_name: String,
201 ctx: QueryContextRef,
202 ) -> ServerResult<Vec<Output>> {
203 self.plugins
204 .get::<PermissionCheckerRef>()
205 .as_ref()
206 .check_permission(ctx.current_user(), PermissionReq::Otlp)
207 .context(AuthSnafu)?;
208
209 let interceptor_ref = self
210 .plugins
211 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
212 interceptor_ref.pre_execute(ctx.clone())?;
213
214 let opt_req = otlp::logs::to_grpc_insert_requests(
215 request,
216 pipeline,
217 pipeline_params,
218 table_name,
219 &ctx,
220 pipeline_handler,
221 )
222 .await?;
223
224 let mut outputs = vec![];
225
226 for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
227 let cnt = requests
228 .inserts
229 .iter()
230 .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
231 .sum::<usize>();
232
233 let o = self
234 .handle_log_inserts(requests, temp_ctx)
235 .await
236 .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
237 .map_err(BoxedError::new)
238 .context(error::ExecuteGrpcQuerySnafu)?;
239 outputs.push(o);
240 }
241
242 Ok(outputs)
243 }
244}
245
246impl Instance {
247 async fn ingest_trace_spans(
250 &self,
251 pipeline_handler: PipelineHandlerRef,
252 pipeline: &PipelineWay,
253 pipeline_params: &GreptimePipelineParams,
254 table_name: String,
255 groups: Vec<TraceSpanGroup>,
256 ctx: QueryContextRef,
257 ) -> ServerResult<TraceIngestOutcome> {
258 let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
259 let ingest_ctx = TraceChunkIngestContext {
260 pipeline_handler,
261 pipeline,
262 pipeline_params,
263 table_name: &table_name,
264 is_trace_v1_model,
265 };
266 let mut ingest_state = TraceIngestState {
267 aux_data: TraceAuxData::default(),
268 outcome: TraceIngestOutcome::default(),
269 failure_messages: Vec::new(),
270 };
271
272 for group in groups {
273 let chunks = group
274 .spans
275 .into_iter()
276 .chunks(TRACE_INGEST_CHUNK_SIZE)
277 .into_iter()
278 .map(|chunk| chunk.collect::<Vec<_>>())
279 .collect::<Vec<_>>();
280 for chunk in chunks {
281 self.ingest_trace_chunk(&ingest_ctx, chunk, ctx.clone(), &mut ingest_state)
282 .await?;
283 }
284 }
285
286 OTLP_TRACES_ROWS.inc_by(ingest_state.outcome.accepted_spans as u64);
287
288 if !ingest_state.aux_data.is_empty() {
289 let (aux_requests, _) = otlp::trace::to_grpc_insert_requests_for_aux_tables(
293 std::mem::take(&mut ingest_state.aux_data),
294 ingest_ctx.pipeline,
295 ingest_ctx.table_name,
296 )?;
297
298 if !aux_requests.inserts.is_empty() {
299 match self
300 .insert_trace_requests(aux_requests, ingest_ctx.is_trace_v1_model, ctx)
301 .await
302 {
303 Ok(output) => {
304 Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
305 }
306 Err(err) => {
307 Self::push_trace_failure_message(
308 &mut ingest_state.failure_messages,
309 "aux_table_update_failed",
310 format!(
311 "Auxiliary trace tables were not fully updated ({})",
312 err.status_code().as_ref()
313 ),
314 );
315 }
316 }
317 }
318 }
319
320 ingest_state.outcome.error_message = Self::finish_trace_failure_message(
321 ingest_state.outcome.accepted_spans,
322 ingest_state.outcome.rejected_spans,
323 ingest_state.failure_messages,
324 );
325
326 Ok(ingest_state.outcome)
327 }
328
329 async fn ingest_trace_chunk(
332 &self,
333 ingest_ctx: &TraceChunkIngestContext<'_>,
334 chunk: Vec<TraceSpan>,
335 ctx: QueryContextRef,
336 ingest_state: &mut TraceIngestState,
337 ) -> ServerResult<()> {
338 let (requests, chunk_rows) = otlp::trace::to_grpc_insert_requests_from_spans(
341 &chunk,
342 ingest_ctx.pipeline,
343 ingest_ctx.pipeline_params,
344 ingest_ctx.table_name,
345 &ctx,
346 ingest_ctx.pipeline_handler.clone(),
347 )?;
348
349 match self
350 .insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone())
351 .await
352 {
353 Ok(output) => {
354 Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
355 ingest_state.outcome.accepted_spans += chunk_rows;
356 for span in &chunk {
357 ingest_state.aux_data.observe_span(span);
358 }
359 }
360 Err(err) => match Self::classify_trace_chunk_failure(err.status_code()) {
361 ChunkFailureReaction::RetryPerSpan => {
362 Self::push_trace_failure_message(
363 &mut ingest_state.failure_messages,
364 ChunkFailureReaction::RetryPerSpan.as_metric_label(),
365 format!("Chunk fallback triggered by {}", err.status_code().as_ref()),
366 );
367 self.ingest_trace_chunk_span_by_span(
373 ingest_ctx,
374 chunk,
375 ctx.clone(),
376 ingest_state,
377 )
378 .await?;
379 }
380 ChunkFailureReaction::DiscardChunk => {
381 ingest_state.outcome.rejected_spans += chunk.len();
382 Self::push_trace_failure_message(
383 &mut ingest_state.failure_messages,
384 ChunkFailureReaction::DiscardChunk.as_metric_label(),
385 format!(
386 "Discarded {} spans after ambiguous chunk failure ({})",
387 chunk.len(),
388 err.status_code().as_ref()
389 ),
390 );
391 }
394 ChunkFailureReaction::Propagate => {
398 Self::push_trace_failure_message(
399 &mut ingest_state.failure_messages,
400 ChunkFailureReaction::Propagate.as_metric_label(),
401 format!(
402 "Propagating retryable chunk failure ({})",
403 err.status_code().as_ref()
404 ),
405 );
406 return Err(err);
407 }
408 },
409 }
410
411 Ok(())
412 }
413
414 async fn ingest_trace_chunk_span_by_span(
416 &self,
417 ingest_ctx: &TraceChunkIngestContext<'_>,
418 chunk: Vec<TraceSpan>,
419 ctx: QueryContextRef,
420 ingest_state: &mut TraceIngestState,
421 ) -> ServerResult<()> {
422 for span in chunk {
423 let (requests, rows) = otlp::trace::to_grpc_insert_requests_from_spans(
424 std::slice::from_ref(&span),
425 ingest_ctx.pipeline,
426 ingest_ctx.pipeline_params,
427 ingest_ctx.table_name,
428 &ctx,
429 ingest_ctx.pipeline_handler.clone(),
430 )?;
431
432 match self
433 .insert_trace_requests(requests, ingest_ctx.is_trace_v1_model, ctx.clone())
434 .await
435 {
436 Ok(output) => {
437 Self::add_trace_write_cost(&mut ingest_state.outcome, output.meta.cost);
438 ingest_state.outcome.accepted_spans += rows;
439 ingest_state.aux_data.observe_span(&span);
440 }
441 Err(err) => {
442 if Self::should_propagate_trace_span_failure(err.status_code()) {
443 Self::push_trace_failure_message(
444 &mut ingest_state.failure_messages,
445 ChunkFailureReaction::Propagate.as_metric_label(),
446 format!(
447 "Propagating retryable span failure for {}:{} ({})",
448 span.trace_id,
449 span.span_id,
450 err.status_code().as_ref()
451 ),
452 );
453 return Err(err);
454 }
455
456 ingest_state.outcome.rejected_spans += 1;
457 Self::push_trace_failure_message(
458 &mut ingest_state.failure_messages,
459 "span_rejected",
460 format!(
461 "Rejected span {}:{} ({})",
462 span.trace_id,
463 span.span_id,
464 err.status_code().as_ref()
465 ),
466 );
467 }
468 }
469 }
470
471 Ok(())
472 }
473
474 async fn insert_trace_requests(
476 &self,
477 mut requests: RowInsertRequests,
478 is_trace_v1_model: bool,
479 ctx: QueryContextRef,
480 ) -> ServerResult<Output> {
481 if is_trace_v1_model {
482 self.reconcile_trace_column_types(&mut requests, &ctx)
483 .await?;
484 self.handle_trace_inserts(requests, ctx)
485 .await
486 .map_err(BoxedError::new)
487 .context(error::ExecuteGrpcQuerySnafu)
488 } else {
489 self.handle_log_inserts(requests, ctx)
490 .await
491 .map_err(BoxedError::new)
492 .context(error::ExecuteGrpcQuerySnafu)
493 }
494 }
495
496 fn classify_trace_chunk_failure(status: StatusCode) -> ChunkFailureReaction {
497 match status {
498 StatusCode::InvalidArguments
499 | StatusCode::InvalidSyntax
500 | StatusCode::Unsupported
501 | StatusCode::TableNotFound
502 | StatusCode::TableColumnNotFound => ChunkFailureReaction::RetryPerSpan,
503 StatusCode::DatabaseNotFound => ChunkFailureReaction::DiscardChunk,
504 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ChunkFailureReaction::Propagate,
505 _ if status.is_retryable() => ChunkFailureReaction::Propagate,
506 _ => ChunkFailureReaction::DiscardChunk,
507 }
508 }
509
510 fn should_propagate_trace_span_failure(status: StatusCode) -> bool {
511 matches!(
512 Self::classify_trace_chunk_failure(status),
513 ChunkFailureReaction::Propagate
514 )
515 }
516
517 fn add_trace_write_cost(outcome: &mut TraceIngestOutcome, cost: usize) {
518 outcome.write_cost += cost;
519 }
520
521 fn push_trace_failure_message(messages: &mut Vec<String>, label: &str, message: String) {
522 OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).inc();
523
524 if messages.len() < TRACE_FAILURE_MESSAGE_LIMIT {
525 messages.push(message);
526 } else if messages.len() == TRACE_FAILURE_MESSAGE_LIMIT {
527 tracing::debug!(
528 label,
529 limit = TRACE_FAILURE_MESSAGE_LIMIT,
530 "Trace ingest failure message limit reached; suppressing additional failure details"
531 );
532 }
533 }
534
535 fn finish_trace_failure_message(
536 accepted_spans: usize,
537 rejected_spans: usize,
538 messages: Vec<String>,
539 ) -> Option<String> {
540 if rejected_spans == 0 && messages.is_empty() {
541 return None;
542 }
543
544 let mut summary = format!(
545 "Accepted {} spans, rejected {} spans",
546 accepted_spans, rejected_spans
547 );
548
549 if !messages.is_empty() {
550 summary.push_str(": ");
551 summary.push_str(&messages.join("; "));
552 }
553
554 Some(summary)
555 }
556
557 async fn alter_trace_table_columns_to_float64(
559 &self,
560 ctx: &QueryContextRef,
561 table_name: &str,
562 column_names: &[String],
563 ) -> ServerResult<()> {
564 let catalog_name = ctx.current_catalog().to_string();
565 let schema_name = ctx.current_schema();
566 let alter_expr = AlterTableExpr {
567 catalog_name: catalog_name.clone(),
568 schema_name: schema_name.clone(),
569 table_name: table_name.to_string(),
570 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
571 modify_column_types: column_names
572 .iter()
573 .map(|column_name| ModifyColumnType {
574 column_name: column_name.clone(),
575 target_type: ColumnDataType::Float64 as i32,
576 target_type_extension: None,
577 })
578 .collect(),
579 })),
580 };
581
582 if let Err(err) = self
583 .statement_executor
584 .alter_table_inner(alter_expr, ctx.clone())
585 .await
586 {
587 let table = self
588 .catalog_manager
589 .table(&catalog_name, &schema_name, table_name, None)
590 .await
591 .map_err(servers::error::Error::from)?;
592 let alter_already_applied = table
593 .map(|table| {
594 let table_schema = table.schema();
595 column_names.iter().all(|column_name| {
596 table_schema
597 .column_schema_by_name(column_name)
598 .and_then(|table_col| {
599 ColumnDataTypeWrapper::try_from(table_col.data_type.clone())
600 .ok()
601 .map(|wrapper| wrapper.datatype())
602 })
603 == Some(ColumnDataType::Float64)
604 })
605 })
606 .unwrap_or(false);
607
608 if alter_already_applied {
609 return Ok(());
610 }
611
612 warn!(
613 table_name,
614 columns = ?column_names,
615 error = %err,
616 "failed to widen trace columns before insert"
617 );
618
619 return Err(wrap_trace_alter_failure(err));
620 }
621
622 Ok(())
623 }
624
625 async fn reconcile_trace_column_types(
629 &self,
630 requests: &mut RowInsertRequests,
631 ctx: &QueryContextRef,
632 ) -> ServerResult<()> {
633 let catalog = ctx.current_catalog();
634 let schema = ctx.current_schema();
635
636 for req in &mut requests.inserts {
637 let table = self
638 .catalog_manager
639 .table(catalog, &schema, &req.table_name, None)
640 .await?;
641
642 let Some(rows) = req.rows.as_mut() else {
643 continue;
644 };
645
646 let table_schema = table.map(|table| table.schema());
647 let mut pending_rewrites = Vec::new();
648 let mut pending_alter_columns = Vec::new();
649
650 for (col_idx, col_schema) in rows.schema.iter().enumerate() {
651 let Some(current_type) = ColumnDataType::try_from(col_schema.datatype).ok() else {
652 continue;
653 };
654
655 let mut observed_types = Vec::new();
656 push_observed_trace_type(&mut observed_types, current_type);
657
658 for row in &rows.rows {
661 let Some(value) = row
662 .values
663 .get(col_idx)
664 .and_then(|value| value.value_data.as_ref())
665 else {
666 continue;
667 };
668
669 let Some(value_type) = trace_value_datatype(value) else {
670 continue;
671 };
672 push_observed_trace_type(&mut observed_types, value_type);
673 }
674
675 let existing_type = table_schema
676 .as_ref()
677 .and_then(|schema| schema.column_schema_by_name(&col_schema.column_name))
678 .and_then(|table_col| {
679 ColumnDataTypeWrapper::try_from(table_col.data_type.clone())
680 .ok()
681 .map(|wrapper| wrapper.datatype())
682 });
683 let fixed_type = trace_semconv_fixed_type(&col_schema.column_name);
684
685 if !observed_types
686 .iter()
687 .copied()
688 .any(is_trace_reconcile_candidate_type)
689 && existing_type
690 .map(|datatype| !is_trace_reconcile_candidate_type(datatype))
691 .unwrap_or(true)
692 && fixed_type.is_none()
693 {
694 continue;
695 }
696
697 let Some(decision) = choose_trace_reconcile_decision(
700 &col_schema.column_name,
701 &observed_types,
702 existing_type,
703 )
704 .map_err(|_| {
705 enrich_trace_reconcile_error(
706 &req.table_name,
707 &col_schema.column_name,
708 &observed_types,
709 existing_type,
710 fixed_type,
711 )
712 })?
713 else {
714 continue;
715 };
716 let target_type = decision.target_type();
717
718 if !decision.requires_alter()
719 && observed_types
720 .iter()
721 .all(|observed| *observed == target_type)
722 && col_schema.datatype == target_type as i32
723 {
724 continue;
725 }
726
727 if decision.requires_alter()
728 && !pending_alter_columns.contains(&col_schema.column_name)
729 {
730 pending_alter_columns.push(col_schema.column_name.clone());
731 }
732
733 pending_rewrites.push(PendingTraceColumnRewrite {
734 col_idx,
735 target_type,
736 column_name: col_schema.column_name.clone(),
737 });
738 }
739
740 if pending_rewrites.is_empty() {
741 continue;
742 }
743
744 validate_trace_column_rewrites(&rows.rows, &pending_rewrites, &req.table_name)?;
745
746 if !pending_alter_columns.is_empty() {
747 self.alter_trace_table_columns_to_float64(
748 ctx,
749 &req.table_name,
750 &pending_alter_columns,
751 )
752 .await?;
753 }
754
755 for pending_rewrite in &pending_rewrites {
757 rows.schema[pending_rewrite.col_idx].datatype = pending_rewrite.target_type as i32;
758 }
759
760 for row in &mut rows.rows {
762 for pending_rewrite in &pending_rewrites {
763 let Some(value) = row.values.get_mut(pending_rewrite.col_idx) else {
764 continue;
765 };
766 let Some(request_type) =
767 value.value_data.as_ref().and_then(trace_value_datatype)
768 else {
769 continue;
770 };
771 if request_type == pending_rewrite.target_type {
772 continue;
773 }
774
775 value.value_data = coerce_value_data(
776 &value.value_data,
777 pending_rewrite.target_type,
778 request_type,
779 )
780 .map_err(|_| {
781 error::InvalidParameterSnafu {
782 reason: format!(
783 "failed to coerce trace column '{}' in table '{}' from {:?} to {:?}",
784 pending_rewrite.column_name,
785 req.table_name,
786 request_type,
787 pending_rewrite.target_type
788 ),
789 }
790 .build()
791 })?;
792 }
793 }
794 }
795
796 Ok(())
797 }
798}
799
800fn wrap_trace_alter_failure<E>(err: E) -> servers::error::Error
802where
803 E: ErrorExt + Send + Sync + 'static,
804{
805 error::ExecuteGrpcQuerySnafu.into_error(BoxedError::new(err))
806}
807
808#[cfg(test)]
809mod tests {
810 use common_error::ext::ErrorExt;
811 use common_error::status_code::StatusCode;
812 use servers::query_handler::TraceIngestOutcome;
813
814 use super::{ChunkFailureReaction, Instance, wrap_trace_alter_failure};
815 use crate::metrics::OTLP_TRACES_FAILURE_COUNT;
816
817 #[test]
818 fn test_classify_trace_chunk_failure() {
819 assert_eq!(
820 Instance::classify_trace_chunk_failure(StatusCode::InvalidArguments),
821 ChunkFailureReaction::RetryPerSpan
822 );
823 assert_eq!(
824 Instance::classify_trace_chunk_failure(StatusCode::InvalidSyntax),
825 ChunkFailureReaction::RetryPerSpan
826 );
827 assert_eq!(
828 Instance::classify_trace_chunk_failure(StatusCode::Unsupported),
829 ChunkFailureReaction::RetryPerSpan
830 );
831 assert_eq!(
832 Instance::classify_trace_chunk_failure(StatusCode::TableColumnNotFound),
833 ChunkFailureReaction::RetryPerSpan
834 );
835 assert_eq!(
836 Instance::classify_trace_chunk_failure(StatusCode::TableNotFound),
837 ChunkFailureReaction::RetryPerSpan
838 );
839 assert_eq!(
840 Instance::classify_trace_chunk_failure(StatusCode::DatabaseNotFound),
841 ChunkFailureReaction::DiscardChunk
842 );
843 assert_eq!(
844 Instance::classify_trace_chunk_failure(StatusCode::DeadlineExceeded),
845 ChunkFailureReaction::Propagate
846 );
847 assert_eq!(
848 Instance::classify_trace_chunk_failure(StatusCode::Cancelled),
849 ChunkFailureReaction::Propagate
850 );
851 assert_eq!(
852 Instance::classify_trace_chunk_failure(StatusCode::StorageUnavailable),
853 ChunkFailureReaction::Propagate
854 );
855 assert_eq!(
856 Instance::classify_trace_chunk_failure(StatusCode::Internal),
857 ChunkFailureReaction::Propagate
858 );
859 assert_eq!(
860 Instance::classify_trace_chunk_failure(StatusCode::RegionNotReady),
861 ChunkFailureReaction::Propagate
862 );
863 assert_eq!(
864 Instance::classify_trace_chunk_failure(StatusCode::TableUnavailable),
865 ChunkFailureReaction::Propagate
866 );
867 assert_eq!(
868 Instance::classify_trace_chunk_failure(StatusCode::RegionBusy),
869 ChunkFailureReaction::Propagate
870 );
871 assert_eq!(
872 Instance::classify_trace_chunk_failure(StatusCode::RuntimeResourcesExhausted),
873 ChunkFailureReaction::Propagate
874 );
875 }
876
877 #[test]
878 fn test_classify_trace_span_failure() {
879 assert!(Instance::should_propagate_trace_span_failure(
880 StatusCode::DeadlineExceeded
881 ));
882 assert!(Instance::should_propagate_trace_span_failure(
883 StatusCode::StorageUnavailable
884 ));
885 assert!(!Instance::should_propagate_trace_span_failure(
886 StatusCode::InvalidArguments
887 ));
888 }
889
890 #[test]
891 fn test_add_trace_write_cost() {
892 let mut outcome = TraceIngestOutcome::default();
893 Instance::add_trace_write_cost(&mut outcome, 3);
894 Instance::add_trace_write_cost(&mut outcome, 5);
895 assert_eq!(outcome.write_cost, 8);
896 }
897
898 #[test]
899 fn test_finish_trace_failure_message() {
900 let message = Instance::finish_trace_failure_message(
901 3,
902 2,
903 vec!["Rejected span trace:span (InvalidArguments)".to_string()],
904 )
905 .unwrap();
906 assert!(message.contains("Accepted 3 spans, rejected 2 spans"));
907 assert!(message.contains("Rejected span trace:span"));
908
909 assert_eq!(Instance::finish_trace_failure_message(2, 0, vec![]), None);
910 }
911
912 #[test]
913 fn test_finish_trace_failure_message_without_detail_messages() {
914 assert_eq!(
915 Instance::finish_trace_failure_message(0, 2, vec![]),
916 Some("Accepted 0 spans, rejected 2 spans".to_string())
917 );
918 }
919
920 #[test]
921 fn test_push_trace_failure_message_increments_labeled_counter() {
922 let label = "retry_per_span_counter_test";
923 let initial = OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get();
924 let mut messages = Vec::new();
925
926 Instance::push_trace_failure_message(
927 &mut messages,
928 label,
929 "Chunk fallback triggered by InvalidArguments".to_string(),
930 );
931
932 assert_eq!(messages.len(), 1);
933 assert_eq!(
934 OTLP_TRACES_FAILURE_COUNT.with_label_values(&[label]).get(),
935 initial + 1
936 );
937 }
938
939 #[test]
940 fn test_push_trace_failure_message_caps_recorded_messages() {
941 let label = "retry_per_span_limit_test";
942 let mut messages = Vec::new();
943
944 for idx in 0..=4 {
945 Instance::push_trace_failure_message(&mut messages, label, format!("failure-{idx}"));
946 }
947
948 assert_eq!(messages.len(), 4);
949 assert_eq!(
950 messages,
951 vec![
952 "failure-0".to_string(),
953 "failure-1".to_string(),
954 "failure-2".to_string(),
955 "failure-3".to_string()
956 ]
957 );
958 }
959
960 #[test]
961 fn test_classify_trace_chunk_failure_defaults_to_discard() {
962 assert_eq!(
963 Instance::classify_trace_chunk_failure(StatusCode::Unknown),
964 ChunkFailureReaction::DiscardChunk
965 );
966 }
967
968 #[test]
969 fn test_wrap_trace_alter_failure_preserves_status_code() {
970 let err = wrap_trace_alter_failure(
971 servers::error::TableNotFoundSnafu {
972 catalog: "greptime".to_string(),
973 schema: "public".to_string(),
974 table: "trace_type_missing".to_string(),
975 }
976 .build(),
977 );
978
979 assert_eq!(err.status_code(), StatusCode::TableNotFound);
980 }
981}