servers/http/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use axum::extract::{Path, Query, State};
19use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
20use axum::response::IntoResponse;
21use axum::Extension;
22use chrono::Utc;
23use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
24use common_error::ext::ErrorExt;
25use common_error::status_code::StatusCode;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::{debug, error, tracing, warn};
29use serde::{Deserialize, Serialize};
30use serde_json::Value as JsonValue;
31use session::context::{Channel, QueryContext};
32use snafu::{OptionExt, ResultExt};
33
34use crate::error::{
35    status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result,
36};
37use crate::http::extractor::TraceTableName;
38use crate::http::HttpRecordsOutput;
39use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
40use crate::otlp::trace::{
41    DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
42    KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
43    SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
44    SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
45    SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
46};
47use crate::query_handler::JaegerQueryHandlerRef;
48
49pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
50
51const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
52const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
53pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
54
55/// JaegerAPIResponse is the response of Jaeger HTTP API.
56/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
57#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
58pub struct JaegerAPIResponse {
59    pub data: Option<JaegerData>,
60    pub total: usize,
61    pub limit: usize,
62    pub offset: usize,
63    pub errors: Vec<JaegerAPIError>,
64}
65
66/// JaegerData is the query result of Jaeger HTTP API.
67#[derive(Debug, Serialize, Deserialize, PartialEq)]
68#[serde(untagged)]
69pub enum JaegerData {
70    ServiceNames(Vec<String>),
71    OperationsNames(Vec<String>),
72    Operations(Vec<Operation>),
73    Traces(Vec<Trace>),
74}
75
76/// JaegerAPIError is the error of Jaeger HTTP API.
77#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
78#[serde(rename_all = "camelCase")]
79pub struct JaegerAPIError {
80    pub code: i32,
81    pub msg: String,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub trace_id: Option<String>,
84}
85
86/// Operation is an operation in a service.
87#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
88#[serde(rename_all = "camelCase")]
89pub struct Operation {
90    pub name: String,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub span_kind: Option<String>,
93}
94
95/// Trace is a collection of spans.
96#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
97#[serde(rename_all = "camelCase")]
98pub struct Trace {
99    #[serde(rename = "traceID")]
100    pub trace_id: String,
101    pub spans: Vec<Span>,
102
103    #[serde(skip_serializing_if = "HashMap::is_empty")]
104    pub processes: HashMap<String, Process>,
105
106    #[serde(skip_serializing_if = "Vec::is_empty")]
107    pub warnings: Vec<String>,
108}
109
110/// Span is a single operation within a trace.
111#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
112#[serde(rename_all = "camelCase")]
113pub struct Span {
114    #[serde(rename = "traceID")]
115    pub trace_id: String,
116
117    #[serde(rename = "spanID")]
118    pub span_id: String,
119
120    #[serde(rename = "parentSpanID")]
121    #[serde(skip_serializing_if = "String::is_empty")]
122    pub parent_span_id: String,
123
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub flags: Option<u32>,
126
127    pub operation_name: String,
128    pub references: Vec<Reference>,
129    pub start_time: u64, // microseconds since unix epoch
130    pub duration: u64,   // microseconds
131    pub tags: Vec<KeyValue>,
132    pub logs: Vec<Log>,
133
134    #[serde(rename = "processID")]
135    #[serde(skip_serializing_if = "String::is_empty")]
136    pub process_id: String,
137
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub process: Option<Process>,
140
141    #[serde(skip_serializing_if = "Vec::is_empty")]
142    pub warnings: Vec<String>,
143}
144
145/// Reference is a reference from one span to another.
146#[derive(Debug, Serialize, Deserialize, PartialEq)]
147#[serde(rename_all = "camelCase")]
148pub struct Reference {
149    #[serde(rename = "traceID")]
150    pub trace_id: String,
151    #[serde(rename = "spanID")]
152    pub span_id: String,
153    pub ref_type: String,
154}
155
156/// Process is the process emitting a set of spans.
157#[derive(Debug, Serialize, Deserialize, PartialEq)]
158#[serde(rename_all = "camelCase")]
159pub struct Process {
160    pub service_name: String,
161    pub tags: Vec<KeyValue>,
162}
163
164/// Log is a log emitted in a span.
165#[derive(Debug, Serialize, Deserialize, PartialEq)]
166#[serde(rename_all = "camelCase")]
167pub struct Log {
168    pub timestamp: u64,
169    pub fields: Vec<KeyValue>,
170}
171
172/// KeyValue is a key-value pair with typed value.
173#[derive(Debug, Serialize, Deserialize, PartialEq)]
174#[serde(rename_all = "camelCase")]
175pub struct KeyValue {
176    pub key: String,
177    #[serde(rename = "type")]
178    pub value_type: ValueType,
179    pub value: Value,
180}
181
182/// Value is the value of a key-value pair in Jaeger Span attributes.
183#[derive(Debug, Serialize, Deserialize, PartialEq)]
184#[serde(untagged)]
185#[serde(rename_all = "camelCase")]
186pub enum Value {
187    String(String),
188    Int64(i64),
189    Float64(f64),
190    Boolean(bool),
191    Binary(Vec<u8>),
192}
193
194/// ValueType is the type of a value stored in KeyValue struct.
195#[derive(Debug, Serialize, Deserialize, PartialEq)]
196#[serde(rename_all = "lowercase")]
197pub enum ValueType {
198    String,
199    Int64,
200    Float64,
201    Boolean,
202    Binary,
203}
204
205/// JaegerQueryParams is the query parameters of Jaeger HTTP API.
206#[derive(Default, Debug, Serialize, Deserialize)]
207#[serde(rename_all = "camelCase")]
208pub struct JaegerQueryParams {
209    /// Service name of the trace.
210    #[serde(rename = "service")]
211    pub service_name: Option<String>,
212
213    /// Operation name of the trace.
214    #[serde(rename = "operation")]
215    pub operation_name: Option<String>,
216
217    /// Limit the return data.
218    pub limit: Option<usize>,
219
220    /// Start time of the trace in microseconds since unix epoch.
221    pub start: Option<i64>,
222
223    /// End time of the trace in microseconds since unix epoch.
224    pub end: Option<i64>,
225
226    /// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
227    pub max_duration: Option<String>,
228
229    /// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
230    pub min_duration: Option<String>,
231
232    /// Tags of the trace in JSON format. It will be URL encoded in the raw query.
233    /// The decoded format is like: tags="{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".
234    /// The key and value of the map are both strings. The key and value is the attribute name and value of the span. The value will be converted to the corresponding type when querying.
235    pub tags: Option<String>,
236
237    /// The span kind of the trace.
238    pub span_kind: Option<String>,
239}
240
241fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
242    // db should be already handled by middlewares
243    query_ctx.set_channel(Channel::Jaeger);
244    if let Some(table) = table_name {
245        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
246    }
247}
248
249impl QueryTraceParams {
250    fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
251        let mut internal_query_params: QueryTraceParams = QueryTraceParams {
252            service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
253                reason: "service_name is required".to_string(),
254            })?,
255            operation_name: query_params.operation_name,
256            // Convert start time from microseconds to nanoseconds.
257            start_time: query_params.start.map(|start| start * 1000),
258            end_time: query_params.end.map(|end| end * 1000),
259            ..Default::default()
260        };
261
262        if let Some(max_duration) = query_params.max_duration {
263            let duration = humantime::parse_duration(&max_duration).map_err(|e| {
264                InvalidJaegerQuerySnafu {
265                    reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
266                }
267                .build()
268            })?;
269            internal_query_params.max_duration = Some(duration.as_nanos() as u64);
270        }
271
272        if let Some(min_duration) = query_params.min_duration {
273            let duration = humantime::parse_duration(&min_duration).map_err(|e| {
274                InvalidJaegerQuerySnafu {
275                    reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
276                }
277                .build()
278            })?;
279            internal_query_params.min_duration = Some(duration.as_nanos() as u64);
280        }
281
282        if let Some(tags) = query_params.tags {
283            // Serialize the tags to a JSON map.
284            let mut tags_map: HashMap<String, JsonValue> =
285                serde_json::from_str(&tags).map_err(|e| {
286                    InvalidJaegerQuerySnafu {
287                        reason: format!("parse tags '{}' failed: {}", tags, e),
288                    }
289                    .build()
290                })?;
291            for (_, v) in tags_map.iter_mut() {
292                if let Some(number) = convert_string_to_number(v) {
293                    *v = number;
294                }
295                if let Some(boolean) = convert_string_to_boolean(v) {
296                    *v = boolean;
297                }
298            }
299            internal_query_params.tags = Some(tags_map);
300        }
301
302        internal_query_params.limit = query_params.limit;
303
304        Ok(internal_query_params)
305    }
306}
307
308#[derive(Debug, Default, PartialEq)]
309pub struct QueryTraceParams {
310    pub service_name: String,
311    pub operation_name: Option<String>,
312
313    // The limit of the number of traces to return.
314    pub limit: Option<usize>,
315
316    // Select the traces with the given tags(span attributes).
317    pub tags: Option<HashMap<String, JsonValue>>,
318
319    // The unit of the following time related parameters is nanoseconds.
320    pub start_time: Option<i64>,
321    pub end_time: Option<i64>,
322    pub min_duration: Option<u64>,
323    pub max_duration: Option<u64>,
324}
325
326/// Handle the GET `/api/services` request.
327#[axum_macros::debug_handler]
328#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
329pub async fn handle_get_services(
330    State(handler): State<JaegerQueryHandlerRef>,
331    Query(query_params): Query<JaegerQueryParams>,
332    Extension(mut query_ctx): Extension<QueryContext>,
333    TraceTableName(table_name): TraceTableName,
334) -> impl IntoResponse {
335    debug!(
336        "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
337        query_params, query_ctx
338    );
339
340    query_ctx.set_channel(Channel::Jaeger);
341    if let Some(table) = table_name {
342        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
343    }
344
345    let query_ctx = Arc::new(query_ctx);
346    let db = query_ctx.get_db_string();
347
348    // Record the query time histogram.
349    let _timer = METRIC_JAEGER_QUERY_ELAPSED
350        .with_label_values(&[&db, "/api/services"])
351        .start_timer();
352
353    match handler.get_services(query_ctx).await {
354        Ok(output) => match covert_to_records(output).await {
355            Ok(Some(records)) => match services_from_records(records) {
356                Ok(services) => {
357                    let services_num = services.len();
358                    (
359                        HttpStatusCode::OK,
360                        axum::Json(JaegerAPIResponse {
361                            data: Some(JaegerData::ServiceNames(services)),
362                            total: services_num,
363                            ..Default::default()
364                        }),
365                    )
366                }
367                Err(err) => {
368                    error!("Failed to get services: {:?}", err);
369                    error_response(err)
370                }
371            },
372            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
373            Err(err) => {
374                error!("Failed to get services: {:?}", err);
375                error_response(err)
376            }
377        },
378        Err(err) => handle_query_error(err, "Failed to get services", &db),
379    }
380}
381
382/// Handle the GET `/api/traces/{trace_id}` request.
383#[axum_macros::debug_handler]
384#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
385pub async fn handle_get_trace(
386    State(handler): State<JaegerQueryHandlerRef>,
387    Path(trace_id): Path<String>,
388    Query(query_params): Query<JaegerQueryParams>,
389    Extension(mut query_ctx): Extension<QueryContext>,
390    TraceTableName(table_name): TraceTableName,
391) -> impl IntoResponse {
392    debug!(
393        "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
394        trace_id, query_params, query_ctx
395    );
396
397    update_query_context(&mut query_ctx, table_name);
398    let query_ctx = Arc::new(query_ctx);
399    let db = query_ctx.get_db_string();
400
401    // Record the query time histogram.
402    let _timer = METRIC_JAEGER_QUERY_ELAPSED
403        .with_label_values(&[&db, "/api/traces"])
404        .start_timer();
405
406    let output = match handler.get_trace(query_ctx, &trace_id).await {
407        Ok(output) => output,
408        Err(err) => {
409            return handle_query_error(
410                err,
411                &format!("Failed to get trace for '{}'", trace_id),
412                &db,
413            );
414        }
415    };
416
417    match covert_to_records(output).await {
418        Ok(Some(records)) => match traces_from_records(records) {
419            Ok(traces) => (
420                HttpStatusCode::OK,
421                axum::Json(JaegerAPIResponse {
422                    data: Some(JaegerData::Traces(traces)),
423                    ..Default::default()
424                }),
425            ),
426            Err(err) => {
427                error!("Failed to get trace '{}': {:?}", trace_id, err);
428                error_response(err)
429            }
430        },
431        Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
432        Err(err) => {
433            error!("Failed to get trace '{}': {:?}", trace_id, err);
434            error_response(err)
435        }
436    }
437}
438
439/// Handle the GET `/api/traces` request.
440#[axum_macros::debug_handler]
441#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
442pub async fn handle_find_traces(
443    State(handler): State<JaegerQueryHandlerRef>,
444    Query(query_params): Query<JaegerQueryParams>,
445    Extension(mut query_ctx): Extension<QueryContext>,
446    TraceTableName(table_name): TraceTableName,
447) -> impl IntoResponse {
448    debug!(
449        "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
450        query_params, query_ctx
451    );
452
453    update_query_context(&mut query_ctx, table_name);
454    let query_ctx = Arc::new(query_ctx);
455    let db = query_ctx.get_db_string();
456
457    // Record the query time histogram.
458    let _timer = METRIC_JAEGER_QUERY_ELAPSED
459        .with_label_values(&[&db, "/api/traces"])
460        .start_timer();
461
462    match QueryTraceParams::from_jaeger_query_params(query_params) {
463        Ok(query_params) => {
464            let output = handler.find_traces(query_ctx, query_params).await;
465            match output {
466                Ok(output) => match covert_to_records(output).await {
467                    Ok(Some(records)) => match traces_from_records(records) {
468                        Ok(traces) => (
469                            HttpStatusCode::OK,
470                            axum::Json(JaegerAPIResponse {
471                                data: Some(JaegerData::Traces(traces)),
472                                ..Default::default()
473                            }),
474                        ),
475                        Err(err) => {
476                            error!("Failed to find traces: {:?}", err);
477                            error_response(err)
478                        }
479                    },
480                    Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
481                    Err(err) => error_response(err),
482                },
483                Err(err) => handle_query_error(err, "Failed to find traces", &db),
484            }
485        }
486        Err(e) => error_response(e),
487    }
488}
489
490/// Handle the GET `/api/operations` request.
491#[axum_macros::debug_handler]
492#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
493pub async fn handle_get_operations(
494    State(handler): State<JaegerQueryHandlerRef>,
495    Query(query_params): Query<JaegerQueryParams>,
496    Extension(mut query_ctx): Extension<QueryContext>,
497    TraceTableName(table_name): TraceTableName,
498    headers: HeaderMap,
499) -> impl IntoResponse {
500    debug!(
501        "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
502        query_params, query_ctx, headers
503    );
504
505    let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
506        Ok((start, end)) => (start, end),
507        Err(e) => return error_response(e),
508    };
509
510    debug!("Get operations with start: {:?}, end: {:?}", start, end);
511
512    if let Some(service_name) = &query_params.service_name {
513        update_query_context(&mut query_ctx, table_name);
514        let query_ctx = Arc::new(query_ctx);
515        let db = query_ctx.get_db_string();
516
517        // Record the query time histogram.
518        let _timer = METRIC_JAEGER_QUERY_ELAPSED
519            .with_label_values(&[&db, "/api/operations"])
520            .start_timer();
521
522        match handler
523            .get_operations(
524                query_ctx,
525                service_name,
526                query_params.span_kind.as_deref(),
527                start,
528                end,
529            )
530            .await
531        {
532            Ok(output) => match covert_to_records(output).await {
533                Ok(Some(records)) => match operations_from_records(records, true) {
534                    Ok(operations) => {
535                        let total = operations.len();
536                        (
537                            HttpStatusCode::OK,
538                            axum::Json(JaegerAPIResponse {
539                                data: Some(JaegerData::Operations(operations)),
540                                total,
541                                ..Default::default()
542                            }),
543                        )
544                    }
545                    Err(err) => {
546                        error!("Failed to get operations: {:?}", err);
547                        error_response(err)
548                    }
549                },
550                Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
551                Err(err) => error_response(err),
552            },
553            Err(err) => handle_query_error(
554                err,
555                &format!("Failed to get operations for service '{}'", service_name),
556                &db,
557            ),
558        }
559    } else {
560        (
561            HttpStatusCode::BAD_REQUEST,
562            axum::Json(JaegerAPIResponse {
563                errors: vec![JaegerAPIError {
564                    code: 400,
565                    msg: "parameter 'service' is required".to_string(),
566                    trace_id: None,
567                }],
568                ..Default::default()
569            }),
570        )
571    }
572}
573
574/// Handle the GET `/api/services/{service_name}/operations` request.
575#[axum_macros::debug_handler]
576#[tracing::instrument(
577    skip_all,
578    fields(protocol = "jaeger", request_type = "get_operations_by_service")
579)]
580pub async fn handle_get_operations_by_service(
581    State(handler): State<JaegerQueryHandlerRef>,
582    Path(service_name): Path<String>,
583    Query(query_params): Query<JaegerQueryParams>,
584    Extension(mut query_ctx): Extension<QueryContext>,
585    TraceTableName(table_name): TraceTableName,
586    headers: HeaderMap,
587) -> impl IntoResponse {
588    debug!(
589        "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
590        service_name, query_params, query_ctx, headers
591    );
592
593    update_query_context(&mut query_ctx, table_name);
594    let query_ctx = Arc::new(query_ctx);
595    let db = query_ctx.get_db_string();
596
597    // Record the query time histogram.
598    let _timer = METRIC_JAEGER_QUERY_ELAPSED
599        .with_label_values(&[&db, "/api/services"])
600        .start_timer();
601
602    let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
603        Ok((start, end)) => (start, end),
604        Err(e) => return error_response(e),
605    };
606
607    match handler
608        .get_operations(query_ctx, &service_name, None, start, end)
609        .await
610    {
611        Ok(output) => match covert_to_records(output).await {
612            Ok(Some(records)) => match operations_from_records(records, false) {
613                Ok(operations) => {
614                    let operations: Vec<String> =
615                        operations.into_iter().map(|op| op.name).collect();
616                    let total = operations.len();
617                    (
618                        HttpStatusCode::OK,
619                        axum::Json(JaegerAPIResponse {
620                            data: Some(JaegerData::OperationsNames(operations)),
621                            total,
622                            ..Default::default()
623                        }),
624                    )
625                }
626                Err(err) => {
627                    error!(
628                        "Failed to get operations for service '{}': {:?}",
629                        service_name, err
630                    );
631                    error_response(err)
632                }
633            },
634            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
635            Err(err) => error_response(err),
636        },
637        Err(err) => handle_query_error(
638            err,
639            &format!("Failed to get operations for service '{}'", service_name),
640            &db,
641        ),
642    }
643}
644
645async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
646    match output.data {
647        OutputData::Stream(stream) => {
648            let records = HttpRecordsOutput::try_new(
649                stream.schema().clone(),
650                util::collect(stream)
651                    .await
652                    .context(CollectRecordbatchSnafu)?,
653            )?;
654            debug!("The query records: {:?}", records);
655            Ok(Some(records))
656        }
657        // It's unlikely to happen. However, if the output is not a stream, return None.
658        _ => Ok(None),
659    }
660}
661
662fn handle_query_error(
663    err: Error,
664    prompt: &str,
665    db: &str,
666) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
667    // To compatible with the Jaeger API, if the trace table is not found, return an empty response instead of an error.
668    if err.status_code() == StatusCode::TableNotFound {
669        warn!(
670            "No trace table '{}' found in database '{}'",
671            TRACE_TABLE_NAME, db
672        );
673        (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
674    } else {
675        error!("{}: {:?}", prompt, err);
676        error_response(err)
677    }
678}
679
680fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
681    (
682        status_code_to_http_status(&err.status_code()),
683        axum::Json(JaegerAPIResponse {
684            errors: vec![JaegerAPIError {
685                code: err.status_code() as i32,
686                msg: err.to_string(),
687                ..Default::default()
688            }],
689            ..Default::default()
690        }),
691    )
692}
693
694fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
695    // maintain the mapping: trace_id -> (process_id -> service_name).
696    let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
697    // maintain the mapping: trace_id -> spans.
698    let mut trace_id_to_spans: HashMap<String, Vec<Span>> = HashMap::new();
699    // maintain the mapping: service.name -> resource.attributes.
700    let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
701
702    let is_span_attributes_flatten = !records
703        .schema
704        .column_schemas
705        .iter()
706        .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
707
708    for row in records.rows.into_iter() {
709        let mut span = Span::default();
710        let mut service_name = None;
711        let mut resource_tags = vec![];
712
713        for (idx, cell) in row.into_iter().enumerate() {
714            // safe to use index here
715            let column_name = &records.schema.column_schemas[idx].name;
716
717            match column_name.as_str() {
718                TRACE_ID_COLUMN => {
719                    if let JsonValue::String(trace_id) = cell {
720                        span.trace_id = trace_id.clone();
721                        trace_id_to_processes.entry(trace_id).or_default();
722                    }
723                }
724                TIMESTAMP_COLUMN => {
725                    span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
726                        reason: "Failed to convert timestamp to u64".to_string(),
727                    })? / 1000;
728                }
729                DURATION_NANO_COLUMN => {
730                    span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
731                        reason: "Failed to convert duration to u64".to_string(),
732                    })? / 1000;
733                }
734                SERVICE_NAME_COLUMN => {
735                    if let JsonValue::String(name) = cell {
736                        service_name = Some(name);
737                    }
738                }
739                SPAN_NAME_COLUMN => {
740                    if let JsonValue::String(span_name) = cell {
741                        span.operation_name = span_name;
742                    }
743                }
744                SPAN_ID_COLUMN => {
745                    if let JsonValue::String(span_id) = cell {
746                        span.span_id = span_id;
747                    }
748                }
749                SPAN_ATTRIBUTES_COLUMN => {
750                    // for v0 data model, span_attributes are nested as a json
751                    // data structure
752                    if let JsonValue::Object(span_attrs) = cell {
753                        span.tags.extend(object_to_tags(span_attrs));
754                    }
755                }
756                RESOURCE_ATTRIBUTES_COLUMN => {
757                    // for v0 data model, resource_attributes are nested as a json
758                    // data structure
759
760                    if let JsonValue::Object(mut resource_attrs) = cell {
761                        resource_attrs.remove(KEY_SERVICE_NAME);
762                        resource_tags = object_to_tags(resource_attrs);
763                    }
764                }
765                PARENT_SPAN_ID_COLUMN => {
766                    if let JsonValue::String(parent_span_id) = cell {
767                        if !parent_span_id.is_empty() {
768                            span.references.push(Reference {
769                                trace_id: span.trace_id.clone(),
770                                span_id: parent_span_id,
771                                ref_type: REF_TYPE_CHILD_OF.to_string(),
772                            });
773                        }
774                    }
775                }
776                SPAN_EVENTS_COLUMN => {
777                    if let JsonValue::Array(events) = cell {
778                        for event in events {
779                            if let JsonValue::Object(mut obj) = event {
780                                let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
781                                    continue;
782                                };
783
784                                let Some(t) =
785                                    obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
786                                        SPAN_KIND_TIME_FMTS
787                                            .iter()
788                                            .find_map(|fmt| {
789                                                chrono::DateTime::parse_from_str(s, fmt).ok()
790                                            })
791                                            .map(|dt| dt.timestamp_micros() as u64)
792                                    })
793                                else {
794                                    continue;
795                                };
796
797                                let mut fields = vec![KeyValue {
798                                    key: "event".to_string(),
799                                    value_type: ValueType::String,
800                                    value: Value::String(action.to_string()),
801                                }];
802
803                                // Add event attributes as fields
804                                if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
805                                    fields.extend(object_to_tags(attrs));
806                                }
807
808                                span.logs.push(Log {
809                                    timestamp: t,
810                                    fields,
811                                });
812                            }
813                        }
814                    }
815                }
816                SCOPE_NAME_COLUMN => {
817                    if let JsonValue::String(scope_name) = cell {
818                        if !scope_name.is_empty() {
819                            span.tags.push(KeyValue {
820                                key: KEY_OTEL_SCOPE_NAME.to_string(),
821                                value_type: ValueType::String,
822                                value: Value::String(scope_name),
823                            });
824                        }
825                    }
826                }
827                SCOPE_VERSION_COLUMN => {
828                    if let JsonValue::String(scope_version) = cell {
829                        if !scope_version.is_empty() {
830                            span.tags.push(KeyValue {
831                                key: KEY_OTEL_SCOPE_VERSION.to_string(),
832                                value_type: ValueType::String,
833                                value: Value::String(scope_version),
834                            });
835                        }
836                    }
837                }
838                SPAN_KIND_COLUMN => {
839                    if let JsonValue::String(span_kind) = cell {
840                        if !span_kind.is_empty() {
841                            span.tags.push(KeyValue {
842                                key: KEY_SPAN_KIND.to_string(),
843                                value_type: ValueType::String,
844                                value: Value::String(normalize_span_kind(&span_kind)),
845                            });
846                        }
847                    }
848                }
849                SPAN_STATUS_CODE => {
850                    if let JsonValue::String(span_status) = cell {
851                        if span_status != SPAN_STATUS_UNSET && !span_status.is_empty() {
852                            span.tags.push(KeyValue {
853                                key: KEY_OTEL_STATUS_CODE.to_string(),
854                                value_type: ValueType::String,
855                                value: Value::String(normalize_status_code(&span_status)),
856                            });
857                        }
858                    }
859                }
860
861                _ => {
862                    // this this v1 data model
863                    if is_span_attributes_flatten {
864                        const SPAN_ATTR_PREFIX: &str = "span_attributes.";
865                        const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
866                        // a span attributes column
867                        if column_name.starts_with(SPAN_ATTR_PREFIX) {
868                            if let Some(keyvalue) = to_keyvalue(
869                                column_name
870                                    .strip_prefix(SPAN_ATTR_PREFIX)
871                                    .unwrap_or_default()
872                                    .to_string(),
873                                cell,
874                            ) {
875                                span.tags.push(keyvalue);
876                            }
877                        } else if column_name.starts_with(RESOURCE_ATTR_PREFIX) {
878                            if let Some(keyvalue) = to_keyvalue(
879                                column_name
880                                    .strip_prefix(RESOURCE_ATTR_PREFIX)
881                                    .unwrap_or_default()
882                                    .to_string(),
883                                cell,
884                            ) {
885                                resource_tags.push(keyvalue);
886                            }
887                        }
888                    }
889                }
890            }
891        }
892
893        if let Some(service_name) = service_name {
894            if !service_to_resource_attributes.contains_key(&service_name) {
895                service_to_resource_attributes.insert(service_name.clone(), resource_tags);
896            }
897
898            if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
899                if let Some(process_id) = process.get(&service_name) {
900                    span.process_id = process_id.clone();
901                } else {
902                    // Allocate a new process id.
903                    let process_id = format!("p{}", process.len() + 1);
904                    process.insert(service_name, process_id.clone());
905                    span.process_id = process_id;
906                }
907            }
908        }
909
910        // ensure span tags order
911        span.tags.sort_by(|a, b| a.key.cmp(&b.key));
912
913        if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
914            spans.push(span);
915        } else {
916            trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
917        }
918    }
919
920    let mut traces = Vec::new();
921    for (trace_id, spans) in trace_id_to_spans {
922        let mut trace = Trace {
923            trace_id,
924            spans,
925            ..Default::default()
926        };
927
928        if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
929            let mut process_id_to_process = HashMap::new();
930            for (service_name, process_id) in processes.into_iter() {
931                let tags = service_to_resource_attributes
932                    .remove(&service_name)
933                    .unwrap_or_default();
934                process_id_to_process.insert(process_id, Process { service_name, tags });
935            }
936            trace.processes = process_id_to_process;
937        }
938        traces.push(trace);
939    }
940
941    Ok(traces)
942}
943
944fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
945    match value {
946        JsonValue::String(value) => Some(KeyValue {
947            key,
948            value_type: ValueType::String,
949            value: Value::String(value.to_string()),
950        }),
951        JsonValue::Number(value) => Some(KeyValue {
952            key,
953            value_type: ValueType::Int64,
954            value: Value::Int64(value.as_i64().unwrap_or(0)),
955        }),
956        JsonValue::Bool(value) => Some(KeyValue {
957            key,
958            value_type: ValueType::Boolean,
959            value: Value::Boolean(value),
960        }),
961        JsonValue::Array(value) => Some(KeyValue {
962            key,
963            value_type: ValueType::String,
964            value: Value::String(serde_json::to_string(&value).unwrap()),
965        }),
966        JsonValue::Object(value) => Some(KeyValue {
967            key,
968            value_type: ValueType::String,
969            value: Value::String(serde_json::to_string(&value).unwrap()),
970        }),
971        JsonValue::Null => None,
972    }
973}
974
975fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
976    object
977        .into_iter()
978        .filter_map(|(key, value)| to_keyvalue(key, value))
979        .collect()
980}
981
982fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
983    let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
984    check_schema(&records, &expected_schema)?;
985
986    let mut services = Vec::with_capacity(records.total_rows);
987    for row in records.rows.into_iter() {
988        for value in row.into_iter() {
989            if let JsonValue::String(service_name) = value {
990                services.push(service_name);
991            }
992        }
993    }
994    Ok(services)
995}
996
997// Construct Jaeger operations from records.
998fn operations_from_records(
999    records: HttpRecordsOutput,
1000    contain_span_kind: bool,
1001) -> Result<Vec<Operation>> {
1002    let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1003    check_schema(&records, &expected_schema)?;
1004
1005    let mut operations = Vec::with_capacity(records.total_rows);
1006    for row in records.rows.into_iter() {
1007        let mut row_iter = row.into_iter();
1008        if let Some(JsonValue::String(operation)) = row_iter.next() {
1009            let mut operation = Operation {
1010                name: operation,
1011                span_kind: None,
1012            };
1013            if contain_span_kind {
1014                if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1015                    operation.span_kind = Some(normalize_span_kind(&span_kind));
1016                }
1017            } else {
1018                // skip span kind.
1019                row_iter.next();
1020            }
1021            operations.push(operation);
1022        }
1023    }
1024
1025    Ok(operations)
1026}
1027
1028// Check whether the schema of the records is correct.
1029fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1030    for (i, column) in records.schema.column_schemas.iter().enumerate() {
1031        if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1032            InvalidJaegerQuerySnafu {
1033                reason: "query result schema is not correct".to_string(),
1034            }
1035            .fail()?
1036        }
1037    }
1038    Ok(())
1039}
1040
1041// By default, the span kind is stored as `SPAN_KIND_<kind>` in GreptimeDB.
1042// However, in Jaeger API, the span kind is returned as `<kind>` which is the lowercase of the span kind and without the `SPAN_KIND_` prefix.
1043fn normalize_span_kind(span_kind: &str) -> String {
1044    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1045    if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1046        stripped.to_lowercase()
1047    } else {
1048        // It's unlikely to happen. However, we still convert it to lowercase for consistency.
1049        span_kind.to_lowercase()
1050    }
1051}
1052
1053// By default, the status code is stored as `STATUS_CODE_<code>` in GreptimeDB.
1054// However, in Jaeger API, the status code is returned as `<code>` without the `STATUS_CODE_` prefix.
1055fn normalize_status_code(status_code: &str) -> String {
1056    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1057    if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1058        stripped.to_string()
1059    } else {
1060        // It's unlikely to happen
1061        status_code.to_string()
1062    }
1063}
1064
1065fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1066    if let Some(data) = input.as_str() {
1067        if let Ok(number) = data.parse::<i64>() {
1068            return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1069        }
1070        if let Ok(number) = data.parse::<f64>() {
1071            if let Some(number) = serde_json::Number::from_f64(number) {
1072                return Some(serde_json::Value::Number(number));
1073            }
1074        }
1075    }
1076
1077    None
1078}
1079
1080fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1081    if let Some(data) = input.as_str() {
1082        if data == "true" {
1083            return Some(serde_json::Value::Bool(true));
1084        }
1085        if data == "false" {
1086            return Some(serde_json::Value::Bool(false));
1087        }
1088    }
1089
1090    None
1091}
1092
1093fn parse_jaeger_time_range_for_operations(
1094    headers: &HeaderMap,
1095    query_params: &JaegerQueryParams,
1096) -> Result<(Option<i64>, Option<i64>)> {
1097    if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
1098        match time_range.to_str() {
1099            Ok(time_range) => match humantime::parse_duration(time_range) {
1100                Ok(duration) => {
1101                    debug!(
1102                        "Get operations with time range: {:?}, duration: {:?}",
1103                        time_range, duration
1104                    );
1105                    let now = Utc::now().timestamp_micros();
1106                    Ok((Some(now - duration.as_micros() as i64), Some(now)))
1107                }
1108                Err(e) => {
1109                    error!("Failed to parse time range header: {:?}", e);
1110                    Err(InvalidJaegerQuerySnafu {
1111                        reason: format!("invalid time range header: {:?}", time_range),
1112                    }
1113                    .build())
1114                }
1115            },
1116            Err(e) => {
1117                error!("Failed to convert time range header to string: {:?}", e);
1118                Err(InvalidJaegerQuerySnafu {
1119                    reason: format!("invalid time range header: {:?}", time_range),
1120                }
1121                .build())
1122            }
1123        }
1124    } else {
1125        Ok((query_params.start, query_params.end))
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use serde_json::{json, Number, Value as JsonValue};
1132
1133    use super::*;
1134    use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1135
1136    #[test]
1137    fn test_services_from_records() {
1138        // The tests is the tuple of `(test_records, expected)`.
1139        let tests = vec![(
1140            HttpRecordsOutput {
1141                schema: OutputSchema {
1142                    column_schemas: vec![ColumnSchema {
1143                        name: "service_name".to_string(),
1144                        data_type: "String".to_string(),
1145                    }],
1146                },
1147                rows: vec![
1148                    vec![JsonValue::String("test-service-0".to_string())],
1149                    vec![JsonValue::String("test-service-1".to_string())],
1150                ],
1151                total_rows: 2,
1152                metrics: HashMap::new(),
1153            },
1154            vec!["test-service-0".to_string(), "test-service-1".to_string()],
1155        )];
1156
1157        for (records, expected) in tests {
1158            let services = services_from_records(records).unwrap();
1159            assert_eq!(services, expected);
1160        }
1161    }
1162
1163    #[test]
1164    fn test_operations_from_records() {
1165        // The tests is the tuple of `(test_records, contain_span_kind, expected)`.
1166        let tests = vec![
1167            (
1168                HttpRecordsOutput {
1169                    schema: OutputSchema {
1170                        column_schemas: vec![
1171                            ColumnSchema {
1172                                name: "span_name".to_string(),
1173                                data_type: "String".to_string(),
1174                            },
1175                            ColumnSchema {
1176                                name: "span_kind".to_string(),
1177                                data_type: "String".to_string(),
1178                            },
1179                        ],
1180                    },
1181                    rows: vec![
1182                        vec![
1183                            JsonValue::String("access-mysql".to_string()),
1184                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1185                        ],
1186                        vec![
1187                            JsonValue::String("access-redis".to_string()),
1188                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1189                        ],
1190                    ],
1191                    total_rows: 2,
1192                    metrics: HashMap::new(),
1193                },
1194                false,
1195                vec![
1196                    Operation {
1197                        name: "access-mysql".to_string(),
1198                        span_kind: None,
1199                    },
1200                    Operation {
1201                        name: "access-redis".to_string(),
1202                        span_kind: None,
1203                    },
1204                ],
1205            ),
1206            (
1207                HttpRecordsOutput {
1208                    schema: OutputSchema {
1209                        column_schemas: vec![
1210                            ColumnSchema {
1211                                name: "span_name".to_string(),
1212                                data_type: "String".to_string(),
1213                            },
1214                            ColumnSchema {
1215                                name: "span_kind".to_string(),
1216                                data_type: "String".to_string(),
1217                            },
1218                        ],
1219                    },
1220                    rows: vec![
1221                        vec![
1222                            JsonValue::String("access-mysql".to_string()),
1223                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1224                        ],
1225                        vec![
1226                            JsonValue::String("access-redis".to_string()),
1227                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1228                        ],
1229                    ],
1230                    total_rows: 2,
1231                    metrics: HashMap::new(),
1232                },
1233                true,
1234                vec![
1235                    Operation {
1236                        name: "access-mysql".to_string(),
1237                        span_kind: Some("server".to_string()),
1238                    },
1239                    Operation {
1240                        name: "access-redis".to_string(),
1241                        span_kind: Some("client".to_string()),
1242                    },
1243                ],
1244            ),
1245        ];
1246
1247        for (records, contain_span_kind, expected) in tests {
1248            let operations = operations_from_records(records, contain_span_kind).unwrap();
1249            assert_eq!(operations, expected);
1250        }
1251    }
1252
1253    #[test]
1254    fn test_traces_from_records() {
1255        // The tests is the tuple of `(test_records, expected)`.
1256        let tests = vec![(
1257            HttpRecordsOutput {
1258                schema: OutputSchema {
1259                    column_schemas: vec![
1260                        ColumnSchema {
1261                            name: "trace_id".to_string(),
1262                            data_type: "String".to_string(),
1263                        },
1264                        ColumnSchema {
1265                            name: "timestamp".to_string(),
1266                            data_type: "TimestampNanosecond".to_string(),
1267                        },
1268                        ColumnSchema {
1269                            name: "duration_nano".to_string(),
1270                            data_type: "UInt64".to_string(),
1271                        },
1272                        ColumnSchema {
1273                            name: "service_name".to_string(),
1274                            data_type: "String".to_string(),
1275                        },
1276                        ColumnSchema {
1277                            name: "span_name".to_string(),
1278                            data_type: "String".to_string(),
1279                        },
1280                        ColumnSchema {
1281                            name: "span_id".to_string(),
1282                            data_type: "String".to_string(),
1283                        },
1284                        ColumnSchema {
1285                            name: "span_attributes".to_string(),
1286                            data_type: "Json".to_string(),
1287                        },
1288                    ],
1289                },
1290                rows: vec![
1291                    vec![
1292                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1293                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1294                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1295                        JsonValue::String("test-service-0".to_string()),
1296                        JsonValue::String("access-mysql".to_string()),
1297                        JsonValue::String("008421dbbd33a3e9".to_string()),
1298                        JsonValue::Object(
1299                            json!({
1300                                "operation.type": "access-mysql",
1301                            })
1302                            .as_object()
1303                            .unwrap()
1304                            .clone(),
1305                        ),
1306                    ],
1307                    vec![
1308                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1309                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1310                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1311                        JsonValue::String("test-service-0".to_string()),
1312                        JsonValue::String("access-redis".to_string()),
1313                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1314                        JsonValue::Object(
1315                            json!({
1316                                "operation.type": "access-redis",
1317                            })
1318                            .as_object()
1319                            .unwrap()
1320                            .clone(),
1321                        ),
1322                    ],
1323                ],
1324                total_rows: 2,
1325                metrics: HashMap::new(),
1326            },
1327            vec![Trace {
1328                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1329                spans: vec![
1330                    Span {
1331                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1332                        span_id: "008421dbbd33a3e9".to_string(),
1333                        operation_name: "access-mysql".to_string(),
1334                        start_time: 1738726754492422,
1335                        duration: 100000,
1336                        tags: vec![KeyValue {
1337                            key: "operation.type".to_string(),
1338                            value_type: ValueType::String,
1339                            value: Value::String("access-mysql".to_string()),
1340                        }],
1341                        process_id: "p1".to_string(),
1342                        ..Default::default()
1343                    },
1344                    Span {
1345                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1346                        span_id: "ffa03416a7b9ea48".to_string(),
1347                        operation_name: "access-redis".to_string(),
1348                        start_time: 1738726754642422,
1349                        duration: 100000,
1350                        tags: vec![KeyValue {
1351                            key: "operation.type".to_string(),
1352                            value_type: ValueType::String,
1353                            value: Value::String("access-redis".to_string()),
1354                        }],
1355                        process_id: "p1".to_string(),
1356                        ..Default::default()
1357                    },
1358                ],
1359                processes: HashMap::from([(
1360                    "p1".to_string(),
1361                    Process {
1362                        service_name: "test-service-0".to_string(),
1363                        tags: vec![],
1364                    },
1365                )]),
1366                ..Default::default()
1367            }],
1368        )];
1369
1370        for (records, expected) in tests {
1371            let traces = traces_from_records(records).unwrap();
1372            assert_eq!(traces, expected);
1373        }
1374    }
1375
1376    #[test]
1377    fn test_traces_from_v1_records() {
1378        // The tests is the tuple of `(test_records, expected)`.
1379        let tests = vec![(
1380            HttpRecordsOutput {
1381                schema: OutputSchema {
1382                    column_schemas: vec![
1383                        ColumnSchema {
1384                            name: "trace_id".to_string(),
1385                            data_type: "String".to_string(),
1386                        },
1387                        ColumnSchema {
1388                            name: "timestamp".to_string(),
1389                            data_type: "TimestampNanosecond".to_string(),
1390                        },
1391                        ColumnSchema {
1392                            name: "duration_nano".to_string(),
1393                            data_type: "UInt64".to_string(),
1394                        },
1395                        ColumnSchema {
1396                            name: "service_name".to_string(),
1397                            data_type: "String".to_string(),
1398                        },
1399                        ColumnSchema {
1400                            name: "span_name".to_string(),
1401                            data_type: "String".to_string(),
1402                        },
1403                        ColumnSchema {
1404                            name: "span_id".to_string(),
1405                            data_type: "String".to_string(),
1406                        },
1407                        ColumnSchema {
1408                            name: "span_attributes.http.request.method".to_string(),
1409                            data_type: "String".to_string(),
1410                        },
1411                        ColumnSchema {
1412                            name: "span_attributes.http.request.url".to_string(),
1413                            data_type: "String".to_string(),
1414                        },
1415                        ColumnSchema {
1416                            name: "span_attributes.http.status_code".to_string(),
1417                            data_type: "UInt64".to_string(),
1418                        },
1419                    ],
1420                },
1421                rows: vec![
1422                    vec![
1423                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1424                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1425                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1426                        JsonValue::String("test-service-0".to_string()),
1427                        JsonValue::String("access-mysql".to_string()),
1428                        JsonValue::String("008421dbbd33a3e9".to_string()),
1429                        JsonValue::String("GET".to_string()),
1430                        JsonValue::String("/data".to_string()),
1431                        JsonValue::Number(Number::from_u128(200).unwrap()),
1432                    ],
1433                    vec![
1434                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1435                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1436                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1437                        JsonValue::String("test-service-0".to_string()),
1438                        JsonValue::String("access-redis".to_string()),
1439                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1440                        JsonValue::String("POST".to_string()),
1441                        JsonValue::String("/create".to_string()),
1442                        JsonValue::Number(Number::from_u128(400).unwrap()),
1443                    ],
1444                ],
1445                total_rows: 2,
1446                metrics: HashMap::new(),
1447            },
1448            vec![Trace {
1449                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1450                spans: vec![
1451                    Span {
1452                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1453                        span_id: "008421dbbd33a3e9".to_string(),
1454                        operation_name: "access-mysql".to_string(),
1455                        start_time: 1738726754492422,
1456                        duration: 100000,
1457                        tags: vec![
1458                            KeyValue {
1459                                key: "http.request.method".to_string(),
1460                                value_type: ValueType::String,
1461                                value: Value::String("GET".to_string()),
1462                            },
1463                            KeyValue {
1464                                key: "http.request.url".to_string(),
1465                                value_type: ValueType::String,
1466                                value: Value::String("/data".to_string()),
1467                            },
1468                            KeyValue {
1469                                key: "http.status_code".to_string(),
1470                                value_type: ValueType::Int64,
1471                                value: Value::Int64(200),
1472                            },
1473                        ],
1474                        process_id: "p1".to_string(),
1475                        ..Default::default()
1476                    },
1477                    Span {
1478                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1479                        span_id: "ffa03416a7b9ea48".to_string(),
1480                        operation_name: "access-redis".to_string(),
1481                        start_time: 1738726754642422,
1482                        duration: 100000,
1483                        tags: vec![
1484                            KeyValue {
1485                                key: "http.request.method".to_string(),
1486                                value_type: ValueType::String,
1487                                value: Value::String("POST".to_string()),
1488                            },
1489                            KeyValue {
1490                                key: "http.request.url".to_string(),
1491                                value_type: ValueType::String,
1492                                value: Value::String("/create".to_string()),
1493                            },
1494                            KeyValue {
1495                                key: "http.status_code".to_string(),
1496                                value_type: ValueType::Int64,
1497                                value: Value::Int64(400),
1498                            },
1499                        ],
1500                        process_id: "p1".to_string(),
1501                        ..Default::default()
1502                    },
1503                ],
1504                processes: HashMap::from([(
1505                    "p1".to_string(),
1506                    Process {
1507                        service_name: "test-service-0".to_string(),
1508                        tags: vec![],
1509                    },
1510                )]),
1511                ..Default::default()
1512            }],
1513        )];
1514
1515        for (records, expected) in tests {
1516            let traces = traces_from_records(records).unwrap();
1517            assert_eq!(traces, expected);
1518        }
1519    }
1520
1521    #[test]
1522    fn test_from_jaeger_query_params() {
1523        // The tests is the tuple of `(test_query_params, expected)`.
1524        let tests = vec![
1525            (
1526                JaegerQueryParams {
1527                    service_name: Some("test-service-0".to_string()),
1528                    ..Default::default()
1529                },
1530                QueryTraceParams {
1531                    service_name: "test-service-0".to_string(),
1532                    ..Default::default()
1533                },
1534            ),
1535            (
1536                JaegerQueryParams {
1537                    service_name: Some("test-service-0".to_string()),
1538                    operation_name: Some("access-mysql".to_string()),
1539                    start: Some(1738726754492422),
1540                    end: Some(1738726754642422),
1541                    max_duration: Some("100ms".to_string()),
1542                    min_duration: Some("50ms".to_string()),
1543                    limit: Some(10),
1544                    tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1545                    ..Default::default()
1546                },
1547                QueryTraceParams {
1548                    service_name: "test-service-0".to_string(),
1549                    operation_name: Some("access-mysql".to_string()),
1550                    start_time: Some(1738726754492422000),
1551                    end_time: Some(1738726754642422000),
1552                    min_duration: Some(50000000),
1553                    max_duration: Some(100000000),
1554                    limit: Some(10),
1555                    tags: Some(HashMap::from([
1556                        ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1557                        ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1558                        ("error".to_string(), JsonValue::Bool(false)),
1559                        ("http.method".to_string(), JsonValue::String("GET".to_string())),
1560                        ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1561                    ])),
1562                },
1563            ),
1564        ];
1565
1566        for (query_params, expected) in tests {
1567            let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1568            assert_eq!(query_params, expected);
1569        }
1570    }
1571
1572    #[test]
1573    fn test_check_schema() {
1574        // The tests is the tuple of `(test_records, expected_schema, is_ok)`.
1575        let tests = vec![(
1576            HttpRecordsOutput {
1577                schema: OutputSchema {
1578                    column_schemas: vec![
1579                        ColumnSchema {
1580                            name: "trace_id".to_string(),
1581                            data_type: "String".to_string(),
1582                        },
1583                        ColumnSchema {
1584                            name: "timestamp".to_string(),
1585                            data_type: "TimestampNanosecond".to_string(),
1586                        },
1587                        ColumnSchema {
1588                            name: "duration_nano".to_string(),
1589                            data_type: "UInt64".to_string(),
1590                        },
1591                        ColumnSchema {
1592                            name: "service_name".to_string(),
1593                            data_type: "String".to_string(),
1594                        },
1595                        ColumnSchema {
1596                            name: "span_name".to_string(),
1597                            data_type: "String".to_string(),
1598                        },
1599                        ColumnSchema {
1600                            name: "span_id".to_string(),
1601                            data_type: "String".to_string(),
1602                        },
1603                        ColumnSchema {
1604                            name: "span_attributes".to_string(),
1605                            data_type: "Json".to_string(),
1606                        },
1607                    ],
1608                },
1609                rows: vec![],
1610                total_rows: 0,
1611                metrics: HashMap::new(),
1612            },
1613            vec![
1614                (TRACE_ID_COLUMN, "String"),
1615                (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1616                (DURATION_NANO_COLUMN, "UInt64"),
1617                (SERVICE_NAME_COLUMN, "String"),
1618                (SPAN_NAME_COLUMN, "String"),
1619                (SPAN_ID_COLUMN, "String"),
1620                (SPAN_ATTRIBUTES_COLUMN, "Json"),
1621            ],
1622            true,
1623        )];
1624
1625        for (records, expected_schema, is_ok) in tests {
1626            let result = check_schema(&records, &expected_schema);
1627            assert_eq!(result.is_ok(), is_ok);
1628        }
1629    }
1630
1631    #[test]
1632    fn test_normalize_span_kind() {
1633        let tests = vec![
1634            ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1635            ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1636        ];
1637
1638        for (input, expected) in tests {
1639            let result = normalize_span_kind(&input);
1640            assert_eq!(result, expected);
1641        }
1642    }
1643
1644    #[test]
1645    fn test_convert_string_to_number() {
1646        let tests = vec![
1647            (
1648                JsonValue::String("123".to_string()),
1649                Some(JsonValue::Number(Number::from(123))),
1650            ),
1651            (
1652                JsonValue::String("123.456".to_string()),
1653                Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1654            ),
1655        ];
1656
1657        for (input, expected) in tests {
1658            let result = convert_string_to_number(&input);
1659            assert_eq!(result, expected);
1660        }
1661    }
1662
1663    #[test]
1664    fn test_convert_string_to_boolean() {
1665        let tests = vec![
1666            (
1667                JsonValue::String("true".to_string()),
1668                Some(JsonValue::Bool(true)),
1669            ),
1670            (
1671                JsonValue::String("false".to_string()),
1672                Some(JsonValue::Bool(false)),
1673            ),
1674        ];
1675
1676        for (input, expected) in tests {
1677            let result = convert_string_to_boolean(&input);
1678            assert_eq!(result, expected);
1679        }
1680    }
1681}