servers/http/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::extract::{Path, Query, State};
21use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
22use axum::response::IntoResponse;
23use axum::Extension;
24use chrono::Utc;
25use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::util;
30use common_telemetry::{debug, error, tracing, warn};
31use serde::{de, Deserialize, Deserializer, Serialize};
32use serde_json::Value as JsonValue;
33use session::context::{Channel, QueryContext};
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37    status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result,
38};
39use crate::http::extractor::TraceTableName;
40use crate::http::HttpRecordsOutput;
41use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
42use crate::otlp::trace::{
43    DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
44    KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
45    SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
46    SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
47    SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
48};
49use crate::query_handler::JaegerQueryHandlerRef;
50
51pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
52
53const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
54const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
55pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
56
57/// JaegerAPIResponse is the response of Jaeger HTTP API.
58/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
59#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
60pub struct JaegerAPIResponse {
61    pub data: Option<JaegerData>,
62    pub total: usize,
63    pub limit: usize,
64    pub offset: usize,
65    pub errors: Vec<JaegerAPIError>,
66}
67
68/// JaegerData is the query result of Jaeger HTTP API.
69#[derive(Debug, Serialize, Deserialize, PartialEq)]
70#[serde(untagged)]
71pub enum JaegerData {
72    ServiceNames(Vec<String>),
73    OperationsNames(Vec<String>),
74    Operations(Vec<Operation>),
75    Traces(Vec<Trace>),
76}
77
78/// JaegerAPIError is the error of Jaeger HTTP API.
79#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
80#[serde(rename_all = "camelCase")]
81pub struct JaegerAPIError {
82    pub code: i32,
83    pub msg: String,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub trace_id: Option<String>,
86}
87
88/// Operation is an operation in a service.
89#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
90#[serde(rename_all = "camelCase")]
91pub struct Operation {
92    pub name: String,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub span_kind: Option<String>,
95}
96
97/// Trace is a collection of spans.
98#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
99#[serde(rename_all = "camelCase")]
100pub struct Trace {
101    #[serde(rename = "traceID")]
102    pub trace_id: String,
103    pub spans: Vec<Span>,
104
105    #[serde(skip_serializing_if = "HashMap::is_empty")]
106    pub processes: HashMap<String, Process>,
107
108    #[serde(skip_serializing_if = "Vec::is_empty")]
109    pub warnings: Vec<String>,
110}
111
112/// Span is a single operation within a trace.
113#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct Span {
116    #[serde(rename = "traceID")]
117    pub trace_id: String,
118
119    #[serde(rename = "spanID")]
120    pub span_id: String,
121
122    #[serde(rename = "parentSpanID")]
123    #[serde(skip_serializing_if = "String::is_empty")]
124    pub parent_span_id: String,
125
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub flags: Option<u32>,
128
129    pub operation_name: String,
130    pub references: Vec<Reference>,
131    pub start_time: u64, // microseconds since unix epoch
132    pub duration: u64,   // microseconds
133    pub tags: Vec<KeyValue>,
134    pub logs: Vec<Log>,
135
136    #[serde(rename = "processID")]
137    #[serde(skip_serializing_if = "String::is_empty")]
138    pub process_id: String,
139
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub process: Option<Process>,
142
143    #[serde(skip_serializing_if = "Vec::is_empty")]
144    pub warnings: Vec<String>,
145}
146
147/// Reference is a reference from one span to another.
148#[derive(Debug, Serialize, Deserialize, PartialEq)]
149#[serde(rename_all = "camelCase")]
150pub struct Reference {
151    #[serde(rename = "traceID")]
152    pub trace_id: String,
153    #[serde(rename = "spanID")]
154    pub span_id: String,
155    pub ref_type: String,
156}
157
158/// Process is the process emitting a set of spans.
159#[derive(Debug, Serialize, Deserialize, PartialEq)]
160#[serde(rename_all = "camelCase")]
161pub struct Process {
162    pub service_name: String,
163    pub tags: Vec<KeyValue>,
164}
165
166/// Log is a log emitted in a span.
167#[derive(Debug, Serialize, Deserialize, PartialEq)]
168#[serde(rename_all = "camelCase")]
169pub struct Log {
170    pub timestamp: u64,
171    pub fields: Vec<KeyValue>,
172}
173
174/// KeyValue is a key-value pair with typed value.
175#[derive(Debug, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub struct KeyValue {
178    pub key: String,
179    #[serde(rename = "type")]
180    pub value_type: ValueType,
181    pub value: Value,
182}
183
184/// Value is the value of a key-value pair in Jaeger Span attributes.
185#[derive(Debug, Serialize, Deserialize, PartialEq)]
186#[serde(untagged)]
187#[serde(rename_all = "camelCase")]
188pub enum Value {
189    String(String),
190    Int64(i64),
191    Float64(f64),
192    Boolean(bool),
193    Binary(Vec<u8>),
194}
195
196/// ValueType is the type of a value stored in KeyValue struct.
197#[derive(Debug, Serialize, Deserialize, PartialEq)]
198#[serde(rename_all = "lowercase")]
199pub enum ValueType {
200    String,
201    Int64,
202    Float64,
203    Boolean,
204    Binary,
205}
206
207/// JaegerQueryParams is the query parameters of Jaeger HTTP API.
208#[derive(Default, Debug, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct JaegerQueryParams {
211    /// Service name of the trace.
212    #[serde(rename = "service")]
213    pub service_name: Option<String>,
214
215    /// Operation name of the trace.
216    #[serde(rename = "operation")]
217    pub operation_name: Option<String>,
218
219    /// Limit the return data.
220    #[serde(default, deserialize_with = "empty_string_as_none")]
221    pub limit: Option<usize>,
222
223    /// Start time of the trace in microseconds since unix epoch.
224    pub start: Option<i64>,
225
226    /// End time of the trace in microseconds since unix epoch.
227    pub end: Option<i64>,
228
229    /// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
230    #[serde(default, deserialize_with = "empty_string_as_none")]
231    pub max_duration: Option<String>,
232
233    /// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
234    #[serde(default, deserialize_with = "empty_string_as_none")]
235    pub min_duration: Option<String>,
236
237    /// Tags of the trace in JSON format. It will be URL encoded in the raw query.
238    /// The decoded format is like: tags="{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".
239    /// The key and value of the map are both strings. The key and value is the attribute name and value of the span. The value will be converted to the corresponding type when querying.
240    pub tags: Option<String>,
241
242    /// The span kind of the trace.
243    pub span_kind: Option<String>,
244}
245
246/// Serde deserialization decorator to map empty Strings to None.
247fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
248where
249    D: Deserializer<'de>,
250    T: FromStr,
251    T::Err: fmt::Display,
252{
253    let opt = Option::<String>::deserialize(de)?;
254    match opt.as_deref() {
255        None | Some("") => Ok(None),
256        Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
257    }
258}
259
260fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
261    // db should be already handled by middlewares
262    query_ctx.set_channel(Channel::Jaeger);
263    if let Some(table) = table_name {
264        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
265    }
266}
267
268impl QueryTraceParams {
269    fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
270        let mut internal_query_params: QueryTraceParams = QueryTraceParams {
271            service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
272                reason: "service_name is required".to_string(),
273            })?,
274            operation_name: query_params.operation_name,
275            // Convert start time from microseconds to nanoseconds.
276            start_time: query_params.start.map(|start| start * 1000),
277            end_time: query_params.end.map(|end| end * 1000),
278            ..Default::default()
279        };
280
281        if let Some(max_duration) = query_params.max_duration {
282            let duration = humantime::parse_duration(&max_duration).map_err(|e| {
283                InvalidJaegerQuerySnafu {
284                    reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
285                }
286                .build()
287            })?;
288            internal_query_params.max_duration = Some(duration.as_nanos() as u64);
289        }
290
291        if let Some(min_duration) = query_params.min_duration {
292            let duration = humantime::parse_duration(&min_duration).map_err(|e| {
293                InvalidJaegerQuerySnafu {
294                    reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
295                }
296                .build()
297            })?;
298            internal_query_params.min_duration = Some(duration.as_nanos() as u64);
299        }
300
301        if let Some(tags) = query_params.tags {
302            // Serialize the tags to a JSON map.
303            let mut tags_map: HashMap<String, JsonValue> =
304                serde_json::from_str(&tags).map_err(|e| {
305                    InvalidJaegerQuerySnafu {
306                        reason: format!("parse tags '{}' failed: {}", tags, e),
307                    }
308                    .build()
309                })?;
310            for (_, v) in tags_map.iter_mut() {
311                if let Some(number) = convert_string_to_number(v) {
312                    *v = number;
313                }
314                if let Some(boolean) = convert_string_to_boolean(v) {
315                    *v = boolean;
316                }
317            }
318            internal_query_params.tags = Some(tags_map);
319        }
320
321        internal_query_params.limit = query_params.limit;
322
323        Ok(internal_query_params)
324    }
325}
326
327#[derive(Debug, Default, PartialEq)]
328pub struct QueryTraceParams {
329    pub service_name: String,
330    pub operation_name: Option<String>,
331
332    // The limit of the number of traces to return.
333    pub limit: Option<usize>,
334
335    // Select the traces with the given tags(span attributes).
336    pub tags: Option<HashMap<String, JsonValue>>,
337
338    // The unit of the following time related parameters is nanoseconds.
339    pub start_time: Option<i64>,
340    pub end_time: Option<i64>,
341    pub min_duration: Option<u64>,
342    pub max_duration: Option<u64>,
343}
344
345/// Handle the GET `/api/services` request.
346#[axum_macros::debug_handler]
347#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
348pub async fn handle_get_services(
349    State(handler): State<JaegerQueryHandlerRef>,
350    Query(query_params): Query<JaegerQueryParams>,
351    Extension(mut query_ctx): Extension<QueryContext>,
352    TraceTableName(table_name): TraceTableName,
353) -> impl IntoResponse {
354    debug!(
355        "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
356        query_params, query_ctx
357    );
358
359    query_ctx.set_channel(Channel::Jaeger);
360    if let Some(table) = table_name {
361        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
362    }
363
364    let query_ctx = Arc::new(query_ctx);
365    let db = query_ctx.get_db_string();
366
367    // Record the query time histogram.
368    let _timer = METRIC_JAEGER_QUERY_ELAPSED
369        .with_label_values(&[&db, "/api/services"])
370        .start_timer();
371
372    match handler.get_services(query_ctx).await {
373        Ok(output) => match covert_to_records(output).await {
374            Ok(Some(records)) => match services_from_records(records) {
375                Ok(services) => {
376                    let services_num = services.len();
377                    (
378                        HttpStatusCode::OK,
379                        axum::Json(JaegerAPIResponse {
380                            data: Some(JaegerData::ServiceNames(services)),
381                            total: services_num,
382                            ..Default::default()
383                        }),
384                    )
385                }
386                Err(err) => {
387                    error!("Failed to get services: {:?}", err);
388                    error_response(err)
389                }
390            },
391            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
392            Err(err) => {
393                error!("Failed to get services: {:?}", err);
394                error_response(err)
395            }
396        },
397        Err(err) => handle_query_error(err, "Failed to get services", &db),
398    }
399}
400
401/// Handle the GET `/api/traces/{trace_id}` request.
402#[axum_macros::debug_handler]
403#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
404pub async fn handle_get_trace(
405    State(handler): State<JaegerQueryHandlerRef>,
406    Path(trace_id): Path<String>,
407    Query(query_params): Query<JaegerQueryParams>,
408    Extension(mut query_ctx): Extension<QueryContext>,
409    TraceTableName(table_name): TraceTableName,
410) -> impl IntoResponse {
411    debug!(
412        "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
413        trace_id, query_params, query_ctx
414    );
415
416    update_query_context(&mut query_ctx, table_name);
417    let query_ctx = Arc::new(query_ctx);
418    let db = query_ctx.get_db_string();
419
420    // Record the query time histogram.
421    let _timer = METRIC_JAEGER_QUERY_ELAPSED
422        .with_label_values(&[&db, "/api/traces"])
423        .start_timer();
424
425    // Convert start time and end time from microseconds to nanoseconds.
426    let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
427    let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
428
429    let output = match handler
430        .get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns)
431        .await
432    {
433        Ok(output) => output,
434        Err(err) => {
435            return handle_query_error(
436                err,
437                &format!("Failed to get trace for '{}'", trace_id),
438                &db,
439            );
440        }
441    };
442
443    match covert_to_records(output).await {
444        Ok(Some(records)) => match traces_from_records(records) {
445            Ok(traces) => (
446                HttpStatusCode::OK,
447                axum::Json(JaegerAPIResponse {
448                    data: Some(JaegerData::Traces(traces)),
449                    ..Default::default()
450                }),
451            ),
452            Err(err) => {
453                error!("Failed to get trace '{}': {:?}", trace_id, err);
454                error_response(err)
455            }
456        },
457        Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
458        Err(err) => {
459            error!("Failed to get trace '{}': {:?}", trace_id, err);
460            error_response(err)
461        }
462    }
463}
464
465/// Handle the GET `/api/traces` request.
466#[axum_macros::debug_handler]
467#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
468pub async fn handle_find_traces(
469    State(handler): State<JaegerQueryHandlerRef>,
470    Query(query_params): Query<JaegerQueryParams>,
471    Extension(mut query_ctx): Extension<QueryContext>,
472    TraceTableName(table_name): TraceTableName,
473) -> impl IntoResponse {
474    debug!(
475        "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
476        query_params, query_ctx
477    );
478
479    update_query_context(&mut query_ctx, table_name);
480    let query_ctx = Arc::new(query_ctx);
481    let db = query_ctx.get_db_string();
482
483    // Record the query time histogram.
484    let _timer = METRIC_JAEGER_QUERY_ELAPSED
485        .with_label_values(&[&db, "/api/traces"])
486        .start_timer();
487
488    match QueryTraceParams::from_jaeger_query_params(query_params) {
489        Ok(query_params) => {
490            let output = handler.find_traces(query_ctx, query_params).await;
491            match output {
492                Ok(output) => match covert_to_records(output).await {
493                    Ok(Some(records)) => match traces_from_records(records) {
494                        Ok(traces) => (
495                            HttpStatusCode::OK,
496                            axum::Json(JaegerAPIResponse {
497                                data: Some(JaegerData::Traces(traces)),
498                                ..Default::default()
499                            }),
500                        ),
501                        Err(err) => {
502                            error!("Failed to find traces: {:?}", err);
503                            error_response(err)
504                        }
505                    },
506                    Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
507                    Err(err) => error_response(err),
508                },
509                Err(err) => handle_query_error(err, "Failed to find traces", &db),
510            }
511        }
512        Err(e) => error_response(e),
513    }
514}
515
516/// Handle the GET `/api/operations` request.
517#[axum_macros::debug_handler]
518#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
519pub async fn handle_get_operations(
520    State(handler): State<JaegerQueryHandlerRef>,
521    Query(query_params): Query<JaegerQueryParams>,
522    Extension(mut query_ctx): Extension<QueryContext>,
523    TraceTableName(table_name): TraceTableName,
524    headers: HeaderMap,
525) -> impl IntoResponse {
526    debug!(
527        "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
528        query_params, query_ctx, headers
529    );
530
531    let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
532        Ok((start, end)) => (start, end),
533        Err(e) => return error_response(e),
534    };
535
536    debug!("Get operations with start: {:?}, end: {:?}", start, end);
537
538    if let Some(service_name) = &query_params.service_name {
539        update_query_context(&mut query_ctx, table_name);
540        let query_ctx = Arc::new(query_ctx);
541        let db = query_ctx.get_db_string();
542
543        // Record the query time histogram.
544        let _timer = METRIC_JAEGER_QUERY_ELAPSED
545            .with_label_values(&[&db, "/api/operations"])
546            .start_timer();
547
548        match handler
549            .get_operations(
550                query_ctx,
551                service_name,
552                query_params.span_kind.as_deref(),
553                start,
554                end,
555            )
556            .await
557        {
558            Ok(output) => match covert_to_records(output).await {
559                Ok(Some(records)) => match operations_from_records(records, true) {
560                    Ok(operations) => {
561                        let total = operations.len();
562                        (
563                            HttpStatusCode::OK,
564                            axum::Json(JaegerAPIResponse {
565                                data: Some(JaegerData::Operations(operations)),
566                                total,
567                                ..Default::default()
568                            }),
569                        )
570                    }
571                    Err(err) => {
572                        error!("Failed to get operations: {:?}", err);
573                        error_response(err)
574                    }
575                },
576                Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
577                Err(err) => error_response(err),
578            },
579            Err(err) => handle_query_error(
580                err,
581                &format!("Failed to get operations for service '{}'", service_name),
582                &db,
583            ),
584        }
585    } else {
586        (
587            HttpStatusCode::BAD_REQUEST,
588            axum::Json(JaegerAPIResponse {
589                errors: vec![JaegerAPIError {
590                    code: 400,
591                    msg: "parameter 'service' is required".to_string(),
592                    trace_id: None,
593                }],
594                ..Default::default()
595            }),
596        )
597    }
598}
599
600/// Handle the GET `/api/services/{service_name}/operations` request.
601#[axum_macros::debug_handler]
602#[tracing::instrument(
603    skip_all,
604    fields(protocol = "jaeger", request_type = "get_operations_by_service")
605)]
606pub async fn handle_get_operations_by_service(
607    State(handler): State<JaegerQueryHandlerRef>,
608    Path(service_name): Path<String>,
609    Query(query_params): Query<JaegerQueryParams>,
610    Extension(mut query_ctx): Extension<QueryContext>,
611    TraceTableName(table_name): TraceTableName,
612    headers: HeaderMap,
613) -> impl IntoResponse {
614    debug!(
615        "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
616        service_name, query_params, query_ctx, headers
617    );
618
619    update_query_context(&mut query_ctx, table_name);
620    let query_ctx = Arc::new(query_ctx);
621    let db = query_ctx.get_db_string();
622
623    // Record the query time histogram.
624    let _timer = METRIC_JAEGER_QUERY_ELAPSED
625        .with_label_values(&[&db, "/api/services"])
626        .start_timer();
627
628    let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
629        Ok((start, end)) => (start, end),
630        Err(e) => return error_response(e),
631    };
632
633    match handler
634        .get_operations(query_ctx, &service_name, None, start, end)
635        .await
636    {
637        Ok(output) => match covert_to_records(output).await {
638            Ok(Some(records)) => match operations_from_records(records, false) {
639                Ok(operations) => {
640                    let operations: Vec<String> =
641                        operations.into_iter().map(|op| op.name).collect();
642                    let total = operations.len();
643                    (
644                        HttpStatusCode::OK,
645                        axum::Json(JaegerAPIResponse {
646                            data: Some(JaegerData::OperationsNames(operations)),
647                            total,
648                            ..Default::default()
649                        }),
650                    )
651                }
652                Err(err) => {
653                    error!(
654                        "Failed to get operations for service '{}': {:?}",
655                        service_name, err
656                    );
657                    error_response(err)
658                }
659            },
660            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
661            Err(err) => error_response(err),
662        },
663        Err(err) => handle_query_error(
664            err,
665            &format!("Failed to get operations for service '{}'", service_name),
666            &db,
667        ),
668    }
669}
670
671async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
672    match output.data {
673        OutputData::Stream(stream) => {
674            let records = HttpRecordsOutput::try_new(
675                stream.schema().clone(),
676                util::collect(stream)
677                    .await
678                    .context(CollectRecordbatchSnafu)?,
679            )?;
680            debug!("The query records: {:?}", records);
681            Ok(Some(records))
682        }
683        // It's unlikely to happen. However, if the output is not a stream, return None.
684        _ => Ok(None),
685    }
686}
687
688fn handle_query_error(
689    err: Error,
690    prompt: &str,
691    db: &str,
692) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
693    // To compatible with the Jaeger API, if the trace table is not found, return an empty response instead of an error.
694    if err.status_code() == StatusCode::TableNotFound {
695        warn!(
696            "No trace table '{}' found in database '{}'",
697            TRACE_TABLE_NAME, db
698        );
699        (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
700    } else {
701        error!("{}: {:?}", prompt, err);
702        error_response(err)
703    }
704}
705
706fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
707    (
708        status_code_to_http_status(&err.status_code()),
709        axum::Json(JaegerAPIResponse {
710            errors: vec![JaegerAPIError {
711                code: err.status_code() as i32,
712                msg: err.to_string(),
713                ..Default::default()
714            }],
715            ..Default::default()
716        }),
717    )
718}
719
720fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
721    // maintain the mapping: trace_id -> (process_id -> service_name).
722    let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
723    // maintain the mapping: trace_id -> spans.
724    let mut trace_id_to_spans: HashMap<String, Vec<Span>> = HashMap::new();
725    // maintain the mapping: service.name -> resource.attributes.
726    let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
727
728    let is_span_attributes_flatten = !records
729        .schema
730        .column_schemas
731        .iter()
732        .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
733
734    for row in records.rows.into_iter() {
735        let mut span = Span::default();
736        let mut service_name = None;
737        let mut resource_tags = vec![];
738
739        for (idx, cell) in row.into_iter().enumerate() {
740            // safe to use index here
741            let column_name = &records.schema.column_schemas[idx].name;
742
743            match column_name.as_str() {
744                TRACE_ID_COLUMN => {
745                    if let JsonValue::String(trace_id) = cell {
746                        span.trace_id = trace_id.clone();
747                        trace_id_to_processes.entry(trace_id).or_default();
748                    }
749                }
750                TIMESTAMP_COLUMN => {
751                    span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
752                        reason: "Failed to convert timestamp to u64".to_string(),
753                    })? / 1000;
754                }
755                DURATION_NANO_COLUMN => {
756                    span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
757                        reason: "Failed to convert duration to u64".to_string(),
758                    })? / 1000;
759                }
760                SERVICE_NAME_COLUMN => {
761                    if let JsonValue::String(name) = cell {
762                        service_name = Some(name);
763                    }
764                }
765                SPAN_NAME_COLUMN => {
766                    if let JsonValue::String(span_name) = cell {
767                        span.operation_name = span_name;
768                    }
769                }
770                SPAN_ID_COLUMN => {
771                    if let JsonValue::String(span_id) = cell {
772                        span.span_id = span_id;
773                    }
774                }
775                SPAN_ATTRIBUTES_COLUMN => {
776                    // for v0 data model, span_attributes are nested as a json
777                    // data structure
778                    if let JsonValue::Object(span_attrs) = cell {
779                        span.tags.extend(object_to_tags(span_attrs));
780                    }
781                }
782                RESOURCE_ATTRIBUTES_COLUMN => {
783                    // for v0 data model, resource_attributes are nested as a json
784                    // data structure
785
786                    if let JsonValue::Object(mut resource_attrs) = cell {
787                        resource_attrs.remove(KEY_SERVICE_NAME);
788                        resource_tags = object_to_tags(resource_attrs);
789                    }
790                }
791                PARENT_SPAN_ID_COLUMN => {
792                    if let JsonValue::String(parent_span_id) = cell {
793                        if !parent_span_id.is_empty() {
794                            span.references.push(Reference {
795                                trace_id: span.trace_id.clone(),
796                                span_id: parent_span_id,
797                                ref_type: REF_TYPE_CHILD_OF.to_string(),
798                            });
799                        }
800                    }
801                }
802                SPAN_EVENTS_COLUMN => {
803                    if let JsonValue::Array(events) = cell {
804                        for event in events {
805                            if let JsonValue::Object(mut obj) = event {
806                                let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
807                                    continue;
808                                };
809
810                                let Some(t) =
811                                    obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
812                                        SPAN_KIND_TIME_FMTS
813                                            .iter()
814                                            .find_map(|fmt| {
815                                                chrono::DateTime::parse_from_str(s, fmt).ok()
816                                            })
817                                            .map(|dt| dt.timestamp_micros() as u64)
818                                    })
819                                else {
820                                    continue;
821                                };
822
823                                let mut fields = vec![KeyValue {
824                                    key: "event".to_string(),
825                                    value_type: ValueType::String,
826                                    value: Value::String(action.to_string()),
827                                }];
828
829                                // Add event attributes as fields
830                                if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
831                                    fields.extend(object_to_tags(attrs));
832                                }
833
834                                span.logs.push(Log {
835                                    timestamp: t,
836                                    fields,
837                                });
838                            }
839                        }
840                    }
841                }
842                SCOPE_NAME_COLUMN => {
843                    if let JsonValue::String(scope_name) = cell {
844                        if !scope_name.is_empty() {
845                            span.tags.push(KeyValue {
846                                key: KEY_OTEL_SCOPE_NAME.to_string(),
847                                value_type: ValueType::String,
848                                value: Value::String(scope_name),
849                            });
850                        }
851                    }
852                }
853                SCOPE_VERSION_COLUMN => {
854                    if let JsonValue::String(scope_version) = cell {
855                        if !scope_version.is_empty() {
856                            span.tags.push(KeyValue {
857                                key: KEY_OTEL_SCOPE_VERSION.to_string(),
858                                value_type: ValueType::String,
859                                value: Value::String(scope_version),
860                            });
861                        }
862                    }
863                }
864                SPAN_KIND_COLUMN => {
865                    if let JsonValue::String(span_kind) = cell {
866                        if !span_kind.is_empty() {
867                            span.tags.push(KeyValue {
868                                key: KEY_SPAN_KIND.to_string(),
869                                value_type: ValueType::String,
870                                value: Value::String(normalize_span_kind(&span_kind)),
871                            });
872                        }
873                    }
874                }
875                SPAN_STATUS_CODE => {
876                    if let JsonValue::String(span_status) = cell {
877                        if span_status != SPAN_STATUS_UNSET && !span_status.is_empty() {
878                            span.tags.push(KeyValue {
879                                key: KEY_OTEL_STATUS_CODE.to_string(),
880                                value_type: ValueType::String,
881                                value: Value::String(normalize_status_code(&span_status)),
882                            });
883                        }
884                    }
885                }
886
887                _ => {
888                    // this this v1 data model
889                    if is_span_attributes_flatten {
890                        const SPAN_ATTR_PREFIX: &str = "span_attributes.";
891                        const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
892                        // a span attributes column
893                        if column_name.starts_with(SPAN_ATTR_PREFIX) {
894                            if let Some(keyvalue) = to_keyvalue(
895                                column_name
896                                    .strip_prefix(SPAN_ATTR_PREFIX)
897                                    .unwrap_or_default()
898                                    .to_string(),
899                                cell,
900                            ) {
901                                span.tags.push(keyvalue);
902                            }
903                        } else if column_name.starts_with(RESOURCE_ATTR_PREFIX) {
904                            if let Some(keyvalue) = to_keyvalue(
905                                column_name
906                                    .strip_prefix(RESOURCE_ATTR_PREFIX)
907                                    .unwrap_or_default()
908                                    .to_string(),
909                                cell,
910                            ) {
911                                resource_tags.push(keyvalue);
912                            }
913                        }
914                    }
915                }
916            }
917        }
918
919        if let Some(service_name) = service_name {
920            if !service_to_resource_attributes.contains_key(&service_name) {
921                service_to_resource_attributes.insert(service_name.clone(), resource_tags);
922            }
923
924            if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
925                if let Some(process_id) = process.get(&service_name) {
926                    span.process_id = process_id.clone();
927                } else {
928                    // Allocate a new process id.
929                    let process_id = format!("p{}", process.len() + 1);
930                    process.insert(service_name, process_id.clone());
931                    span.process_id = process_id;
932                }
933            }
934        }
935
936        // ensure span tags order
937        span.tags.sort_by(|a, b| a.key.cmp(&b.key));
938
939        if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
940            spans.push(span);
941        } else {
942            trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
943        }
944    }
945
946    let mut traces = Vec::new();
947    for (trace_id, spans) in trace_id_to_spans {
948        let mut trace = Trace {
949            trace_id,
950            spans,
951            ..Default::default()
952        };
953
954        if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
955            let mut process_id_to_process = HashMap::new();
956            for (service_name, process_id) in processes.into_iter() {
957                let tags = service_to_resource_attributes
958                    .remove(&service_name)
959                    .unwrap_or_default();
960                process_id_to_process.insert(process_id, Process { service_name, tags });
961            }
962            trace.processes = process_id_to_process;
963        }
964        traces.push(trace);
965    }
966
967    Ok(traces)
968}
969
970fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
971    match value {
972        JsonValue::String(value) => Some(KeyValue {
973            key,
974            value_type: ValueType::String,
975            value: Value::String(value.to_string()),
976        }),
977        JsonValue::Number(value) => Some(KeyValue {
978            key,
979            value_type: ValueType::Int64,
980            value: Value::Int64(value.as_i64().unwrap_or(0)),
981        }),
982        JsonValue::Bool(value) => Some(KeyValue {
983            key,
984            value_type: ValueType::Boolean,
985            value: Value::Boolean(value),
986        }),
987        JsonValue::Array(value) => Some(KeyValue {
988            key,
989            value_type: ValueType::String,
990            value: Value::String(serde_json::to_string(&value).unwrap()),
991        }),
992        JsonValue::Object(value) => Some(KeyValue {
993            key,
994            value_type: ValueType::String,
995            value: Value::String(serde_json::to_string(&value).unwrap()),
996        }),
997        JsonValue::Null => None,
998    }
999}
1000
1001fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1002    object
1003        .into_iter()
1004        .filter_map(|(key, value)| to_keyvalue(key, value))
1005        .collect()
1006}
1007
1008fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1009    let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1010    check_schema(&records, &expected_schema)?;
1011
1012    let mut services = Vec::with_capacity(records.total_rows);
1013    for row in records.rows.into_iter() {
1014        for value in row.into_iter() {
1015            if let JsonValue::String(service_name) = value {
1016                services.push(service_name);
1017            }
1018        }
1019    }
1020    Ok(services)
1021}
1022
1023// Construct Jaeger operations from records.
1024fn operations_from_records(
1025    records: HttpRecordsOutput,
1026    contain_span_kind: bool,
1027) -> Result<Vec<Operation>> {
1028    let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1029    check_schema(&records, &expected_schema)?;
1030
1031    let mut operations = Vec::with_capacity(records.total_rows);
1032    for row in records.rows.into_iter() {
1033        let mut row_iter = row.into_iter();
1034        if let Some(JsonValue::String(operation)) = row_iter.next() {
1035            let mut operation = Operation {
1036                name: operation,
1037                span_kind: None,
1038            };
1039            if contain_span_kind {
1040                if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1041                    operation.span_kind = Some(normalize_span_kind(&span_kind));
1042                }
1043            } else {
1044                // skip span kind.
1045                row_iter.next();
1046            }
1047            operations.push(operation);
1048        }
1049    }
1050
1051    Ok(operations)
1052}
1053
1054// Check whether the schema of the records is correct.
1055fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1056    for (i, column) in records.schema.column_schemas.iter().enumerate() {
1057        if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1058            InvalidJaegerQuerySnafu {
1059                reason: "query result schema is not correct".to_string(),
1060            }
1061            .fail()?
1062        }
1063    }
1064    Ok(())
1065}
1066
1067// By default, the span kind is stored as `SPAN_KIND_<kind>` in GreptimeDB.
1068// However, in Jaeger API, the span kind is returned as `<kind>` which is the lowercase of the span kind and without the `SPAN_KIND_` prefix.
1069fn normalize_span_kind(span_kind: &str) -> String {
1070    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1071    if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1072        stripped.to_lowercase()
1073    } else {
1074        // It's unlikely to happen. However, we still convert it to lowercase for consistency.
1075        span_kind.to_lowercase()
1076    }
1077}
1078
1079// By default, the status code is stored as `STATUS_CODE_<code>` in GreptimeDB.
1080// However, in Jaeger API, the status code is returned as `<code>` without the `STATUS_CODE_` prefix.
1081fn normalize_status_code(status_code: &str) -> String {
1082    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1083    if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1084        stripped.to_string()
1085    } else {
1086        // It's unlikely to happen
1087        status_code.to_string()
1088    }
1089}
1090
1091fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1092    if let Some(data) = input.as_str() {
1093        if let Ok(number) = data.parse::<i64>() {
1094            return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1095        }
1096        if let Ok(number) = data.parse::<f64>() {
1097            if let Some(number) = serde_json::Number::from_f64(number) {
1098                return Some(serde_json::Value::Number(number));
1099            }
1100        }
1101    }
1102
1103    None
1104}
1105
1106fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1107    if let Some(data) = input.as_str() {
1108        if data == "true" {
1109            return Some(serde_json::Value::Bool(true));
1110        }
1111        if data == "false" {
1112            return Some(serde_json::Value::Bool(false));
1113        }
1114    }
1115
1116    None
1117}
1118
1119fn parse_jaeger_time_range_for_operations(
1120    headers: &HeaderMap,
1121    query_params: &JaegerQueryParams,
1122) -> Result<(Option<i64>, Option<i64>)> {
1123    if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
1124        match time_range.to_str() {
1125            Ok(time_range) => match humantime::parse_duration(time_range) {
1126                Ok(duration) => {
1127                    debug!(
1128                        "Get operations with time range: {:?}, duration: {:?}",
1129                        time_range, duration
1130                    );
1131                    let now = Utc::now().timestamp_micros();
1132                    Ok((Some(now - duration.as_micros() as i64), Some(now)))
1133                }
1134                Err(e) => {
1135                    error!("Failed to parse time range header: {:?}", e);
1136                    Err(InvalidJaegerQuerySnafu {
1137                        reason: format!("invalid time range header: {:?}", time_range),
1138                    }
1139                    .build())
1140                }
1141            },
1142            Err(e) => {
1143                error!("Failed to convert time range header to string: {:?}", e);
1144                Err(InvalidJaegerQuerySnafu {
1145                    reason: format!("invalid time range header: {:?}", time_range),
1146                }
1147                .build())
1148            }
1149        }
1150    } else {
1151        Ok((query_params.start, query_params.end))
1152    }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157    use serde_json::{json, Number, Value as JsonValue};
1158
1159    use super::*;
1160    use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1161
1162    #[test]
1163    fn test_services_from_records() {
1164        // The tests is the tuple of `(test_records, expected)`.
1165        let tests = vec![(
1166            HttpRecordsOutput {
1167                schema: OutputSchema {
1168                    column_schemas: vec![ColumnSchema {
1169                        name: "service_name".to_string(),
1170                        data_type: "String".to_string(),
1171                    }],
1172                },
1173                rows: vec![
1174                    vec![JsonValue::String("test-service-0".to_string())],
1175                    vec![JsonValue::String("test-service-1".to_string())],
1176                ],
1177                total_rows: 2,
1178                metrics: HashMap::new(),
1179            },
1180            vec!["test-service-0".to_string(), "test-service-1".to_string()],
1181        )];
1182
1183        for (records, expected) in tests {
1184            let services = services_from_records(records).unwrap();
1185            assert_eq!(services, expected);
1186        }
1187    }
1188
1189    #[test]
1190    fn test_operations_from_records() {
1191        // The tests is the tuple of `(test_records, contain_span_kind, expected)`.
1192        let tests = vec![
1193            (
1194                HttpRecordsOutput {
1195                    schema: OutputSchema {
1196                        column_schemas: vec![
1197                            ColumnSchema {
1198                                name: "span_name".to_string(),
1199                                data_type: "String".to_string(),
1200                            },
1201                            ColumnSchema {
1202                                name: "span_kind".to_string(),
1203                                data_type: "String".to_string(),
1204                            },
1205                        ],
1206                    },
1207                    rows: vec![
1208                        vec![
1209                            JsonValue::String("access-mysql".to_string()),
1210                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1211                        ],
1212                        vec![
1213                            JsonValue::String("access-redis".to_string()),
1214                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1215                        ],
1216                    ],
1217                    total_rows: 2,
1218                    metrics: HashMap::new(),
1219                },
1220                false,
1221                vec![
1222                    Operation {
1223                        name: "access-mysql".to_string(),
1224                        span_kind: None,
1225                    },
1226                    Operation {
1227                        name: "access-redis".to_string(),
1228                        span_kind: None,
1229                    },
1230                ],
1231            ),
1232            (
1233                HttpRecordsOutput {
1234                    schema: OutputSchema {
1235                        column_schemas: vec![
1236                            ColumnSchema {
1237                                name: "span_name".to_string(),
1238                                data_type: "String".to_string(),
1239                            },
1240                            ColumnSchema {
1241                                name: "span_kind".to_string(),
1242                                data_type: "String".to_string(),
1243                            },
1244                        ],
1245                    },
1246                    rows: vec![
1247                        vec![
1248                            JsonValue::String("access-mysql".to_string()),
1249                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1250                        ],
1251                        vec![
1252                            JsonValue::String("access-redis".to_string()),
1253                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1254                        ],
1255                    ],
1256                    total_rows: 2,
1257                    metrics: HashMap::new(),
1258                },
1259                true,
1260                vec![
1261                    Operation {
1262                        name: "access-mysql".to_string(),
1263                        span_kind: Some("server".to_string()),
1264                    },
1265                    Operation {
1266                        name: "access-redis".to_string(),
1267                        span_kind: Some("client".to_string()),
1268                    },
1269                ],
1270            ),
1271        ];
1272
1273        for (records, contain_span_kind, expected) in tests {
1274            let operations = operations_from_records(records, contain_span_kind).unwrap();
1275            assert_eq!(operations, expected);
1276        }
1277    }
1278
1279    #[test]
1280    fn test_traces_from_records() {
1281        // The tests is the tuple of `(test_records, expected)`.
1282        let tests = vec![(
1283            HttpRecordsOutput {
1284                schema: OutputSchema {
1285                    column_schemas: vec![
1286                        ColumnSchema {
1287                            name: "trace_id".to_string(),
1288                            data_type: "String".to_string(),
1289                        },
1290                        ColumnSchema {
1291                            name: "timestamp".to_string(),
1292                            data_type: "TimestampNanosecond".to_string(),
1293                        },
1294                        ColumnSchema {
1295                            name: "duration_nano".to_string(),
1296                            data_type: "UInt64".to_string(),
1297                        },
1298                        ColumnSchema {
1299                            name: "service_name".to_string(),
1300                            data_type: "String".to_string(),
1301                        },
1302                        ColumnSchema {
1303                            name: "span_name".to_string(),
1304                            data_type: "String".to_string(),
1305                        },
1306                        ColumnSchema {
1307                            name: "span_id".to_string(),
1308                            data_type: "String".to_string(),
1309                        },
1310                        ColumnSchema {
1311                            name: "span_attributes".to_string(),
1312                            data_type: "Json".to_string(),
1313                        },
1314                    ],
1315                },
1316                rows: vec![
1317                    vec![
1318                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1319                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1320                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1321                        JsonValue::String("test-service-0".to_string()),
1322                        JsonValue::String("access-mysql".to_string()),
1323                        JsonValue::String("008421dbbd33a3e9".to_string()),
1324                        JsonValue::Object(
1325                            json!({
1326                                "operation.type": "access-mysql",
1327                            })
1328                            .as_object()
1329                            .unwrap()
1330                            .clone(),
1331                        ),
1332                    ],
1333                    vec![
1334                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1335                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1336                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1337                        JsonValue::String("test-service-0".to_string()),
1338                        JsonValue::String("access-redis".to_string()),
1339                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1340                        JsonValue::Object(
1341                            json!({
1342                                "operation.type": "access-redis",
1343                            })
1344                            .as_object()
1345                            .unwrap()
1346                            .clone(),
1347                        ),
1348                    ],
1349                ],
1350                total_rows: 2,
1351                metrics: HashMap::new(),
1352            },
1353            vec![Trace {
1354                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1355                spans: vec![
1356                    Span {
1357                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1358                        span_id: "008421dbbd33a3e9".to_string(),
1359                        operation_name: "access-mysql".to_string(),
1360                        start_time: 1738726754492422,
1361                        duration: 100000,
1362                        tags: vec![KeyValue {
1363                            key: "operation.type".to_string(),
1364                            value_type: ValueType::String,
1365                            value: Value::String("access-mysql".to_string()),
1366                        }],
1367                        process_id: "p1".to_string(),
1368                        ..Default::default()
1369                    },
1370                    Span {
1371                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1372                        span_id: "ffa03416a7b9ea48".to_string(),
1373                        operation_name: "access-redis".to_string(),
1374                        start_time: 1738726754642422,
1375                        duration: 100000,
1376                        tags: vec![KeyValue {
1377                            key: "operation.type".to_string(),
1378                            value_type: ValueType::String,
1379                            value: Value::String("access-redis".to_string()),
1380                        }],
1381                        process_id: "p1".to_string(),
1382                        ..Default::default()
1383                    },
1384                ],
1385                processes: HashMap::from([(
1386                    "p1".to_string(),
1387                    Process {
1388                        service_name: "test-service-0".to_string(),
1389                        tags: vec![],
1390                    },
1391                )]),
1392                ..Default::default()
1393            }],
1394        )];
1395
1396        for (records, expected) in tests {
1397            let traces = traces_from_records(records).unwrap();
1398            assert_eq!(traces, expected);
1399        }
1400    }
1401
1402    #[test]
1403    fn test_traces_from_v1_records() {
1404        // The tests is the tuple of `(test_records, expected)`.
1405        let tests = vec![(
1406            HttpRecordsOutput {
1407                schema: OutputSchema {
1408                    column_schemas: vec![
1409                        ColumnSchema {
1410                            name: "trace_id".to_string(),
1411                            data_type: "String".to_string(),
1412                        },
1413                        ColumnSchema {
1414                            name: "timestamp".to_string(),
1415                            data_type: "TimestampNanosecond".to_string(),
1416                        },
1417                        ColumnSchema {
1418                            name: "duration_nano".to_string(),
1419                            data_type: "UInt64".to_string(),
1420                        },
1421                        ColumnSchema {
1422                            name: "service_name".to_string(),
1423                            data_type: "String".to_string(),
1424                        },
1425                        ColumnSchema {
1426                            name: "span_name".to_string(),
1427                            data_type: "String".to_string(),
1428                        },
1429                        ColumnSchema {
1430                            name: "span_id".to_string(),
1431                            data_type: "String".to_string(),
1432                        },
1433                        ColumnSchema {
1434                            name: "span_attributes.http.request.method".to_string(),
1435                            data_type: "String".to_string(),
1436                        },
1437                        ColumnSchema {
1438                            name: "span_attributes.http.request.url".to_string(),
1439                            data_type: "String".to_string(),
1440                        },
1441                        ColumnSchema {
1442                            name: "span_attributes.http.status_code".to_string(),
1443                            data_type: "UInt64".to_string(),
1444                        },
1445                    ],
1446                },
1447                rows: vec![
1448                    vec![
1449                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1450                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1451                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1452                        JsonValue::String("test-service-0".to_string()),
1453                        JsonValue::String("access-mysql".to_string()),
1454                        JsonValue::String("008421dbbd33a3e9".to_string()),
1455                        JsonValue::String("GET".to_string()),
1456                        JsonValue::String("/data".to_string()),
1457                        JsonValue::Number(Number::from_u128(200).unwrap()),
1458                    ],
1459                    vec![
1460                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1461                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1462                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1463                        JsonValue::String("test-service-0".to_string()),
1464                        JsonValue::String("access-redis".to_string()),
1465                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1466                        JsonValue::String("POST".to_string()),
1467                        JsonValue::String("/create".to_string()),
1468                        JsonValue::Number(Number::from_u128(400).unwrap()),
1469                    ],
1470                ],
1471                total_rows: 2,
1472                metrics: HashMap::new(),
1473            },
1474            vec![Trace {
1475                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1476                spans: vec![
1477                    Span {
1478                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1479                        span_id: "008421dbbd33a3e9".to_string(),
1480                        operation_name: "access-mysql".to_string(),
1481                        start_time: 1738726754492422,
1482                        duration: 100000,
1483                        tags: vec![
1484                            KeyValue {
1485                                key: "http.request.method".to_string(),
1486                                value_type: ValueType::String,
1487                                value: Value::String("GET".to_string()),
1488                            },
1489                            KeyValue {
1490                                key: "http.request.url".to_string(),
1491                                value_type: ValueType::String,
1492                                value: Value::String("/data".to_string()),
1493                            },
1494                            KeyValue {
1495                                key: "http.status_code".to_string(),
1496                                value_type: ValueType::Int64,
1497                                value: Value::Int64(200),
1498                            },
1499                        ],
1500                        process_id: "p1".to_string(),
1501                        ..Default::default()
1502                    },
1503                    Span {
1504                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1505                        span_id: "ffa03416a7b9ea48".to_string(),
1506                        operation_name: "access-redis".to_string(),
1507                        start_time: 1738726754642422,
1508                        duration: 100000,
1509                        tags: vec![
1510                            KeyValue {
1511                                key: "http.request.method".to_string(),
1512                                value_type: ValueType::String,
1513                                value: Value::String("POST".to_string()),
1514                            },
1515                            KeyValue {
1516                                key: "http.request.url".to_string(),
1517                                value_type: ValueType::String,
1518                                value: Value::String("/create".to_string()),
1519                            },
1520                            KeyValue {
1521                                key: "http.status_code".to_string(),
1522                                value_type: ValueType::Int64,
1523                                value: Value::Int64(400),
1524                            },
1525                        ],
1526                        process_id: "p1".to_string(),
1527                        ..Default::default()
1528                    },
1529                ],
1530                processes: HashMap::from([(
1531                    "p1".to_string(),
1532                    Process {
1533                        service_name: "test-service-0".to_string(),
1534                        tags: vec![],
1535                    },
1536                )]),
1537                ..Default::default()
1538            }],
1539        )];
1540
1541        for (records, expected) in tests {
1542            let traces = traces_from_records(records).unwrap();
1543            assert_eq!(traces, expected);
1544        }
1545    }
1546
1547    #[test]
1548    fn test_from_jaeger_query_params() {
1549        // The tests is the tuple of `(test_query_params, expected)`.
1550        let tests = vec![
1551            (
1552                JaegerQueryParams {
1553                    service_name: Some("test-service-0".to_string()),
1554                    ..Default::default()
1555                },
1556                QueryTraceParams {
1557                    service_name: "test-service-0".to_string(),
1558                    ..Default::default()
1559                },
1560            ),
1561            (
1562                JaegerQueryParams {
1563                    service_name: Some("test-service-0".to_string()),
1564                    operation_name: Some("access-mysql".to_string()),
1565                    start: Some(1738726754492422),
1566                    end: Some(1738726754642422),
1567                    max_duration: Some("100ms".to_string()),
1568                    min_duration: Some("50ms".to_string()),
1569                    limit: Some(10),
1570                    tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1571                    ..Default::default()
1572                },
1573                QueryTraceParams {
1574                    service_name: "test-service-0".to_string(),
1575                    operation_name: Some("access-mysql".to_string()),
1576                    start_time: Some(1738726754492422000),
1577                    end_time: Some(1738726754642422000),
1578                    min_duration: Some(50000000),
1579                    max_duration: Some(100000000),
1580                    limit: Some(10),
1581                    tags: Some(HashMap::from([
1582                        ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1583                        ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1584                        ("error".to_string(), JsonValue::Bool(false)),
1585                        ("http.method".to_string(), JsonValue::String("GET".to_string())),
1586                        ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1587                    ])),
1588                },
1589            ),
1590        ];
1591
1592        for (query_params, expected) in tests {
1593            let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1594            assert_eq!(query_params, expected);
1595        }
1596    }
1597
1598    #[test]
1599    fn test_check_schema() {
1600        // The tests is the tuple of `(test_records, expected_schema, is_ok)`.
1601        let tests = vec![(
1602            HttpRecordsOutput {
1603                schema: OutputSchema {
1604                    column_schemas: vec![
1605                        ColumnSchema {
1606                            name: "trace_id".to_string(),
1607                            data_type: "String".to_string(),
1608                        },
1609                        ColumnSchema {
1610                            name: "timestamp".to_string(),
1611                            data_type: "TimestampNanosecond".to_string(),
1612                        },
1613                        ColumnSchema {
1614                            name: "duration_nano".to_string(),
1615                            data_type: "UInt64".to_string(),
1616                        },
1617                        ColumnSchema {
1618                            name: "service_name".to_string(),
1619                            data_type: "String".to_string(),
1620                        },
1621                        ColumnSchema {
1622                            name: "span_name".to_string(),
1623                            data_type: "String".to_string(),
1624                        },
1625                        ColumnSchema {
1626                            name: "span_id".to_string(),
1627                            data_type: "String".to_string(),
1628                        },
1629                        ColumnSchema {
1630                            name: "span_attributes".to_string(),
1631                            data_type: "Json".to_string(),
1632                        },
1633                    ],
1634                },
1635                rows: vec![],
1636                total_rows: 0,
1637                metrics: HashMap::new(),
1638            },
1639            vec![
1640                (TRACE_ID_COLUMN, "String"),
1641                (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1642                (DURATION_NANO_COLUMN, "UInt64"),
1643                (SERVICE_NAME_COLUMN, "String"),
1644                (SPAN_NAME_COLUMN, "String"),
1645                (SPAN_ID_COLUMN, "String"),
1646                (SPAN_ATTRIBUTES_COLUMN, "Json"),
1647            ],
1648            true,
1649        )];
1650
1651        for (records, expected_schema, is_ok) in tests {
1652            let result = check_schema(&records, &expected_schema);
1653            assert_eq!(result.is_ok(), is_ok);
1654        }
1655    }
1656
1657    #[test]
1658    fn test_normalize_span_kind() {
1659        let tests = vec![
1660            ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1661            ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1662        ];
1663
1664        for (input, expected) in tests {
1665            let result = normalize_span_kind(&input);
1666            assert_eq!(result, expected);
1667        }
1668    }
1669
1670    #[test]
1671    fn test_convert_string_to_number() {
1672        let tests = vec![
1673            (
1674                JsonValue::String("123".to_string()),
1675                Some(JsonValue::Number(Number::from(123))),
1676            ),
1677            (
1678                JsonValue::String("123.456".to_string()),
1679                Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1680            ),
1681        ];
1682
1683        for (input, expected) in tests {
1684            let result = convert_string_to_number(&input);
1685            assert_eq!(result, expected);
1686        }
1687    }
1688
1689    #[test]
1690    fn test_convert_string_to_boolean() {
1691        let tests = vec![
1692            (
1693                JsonValue::String("true".to_string()),
1694                Some(JsonValue::Bool(true)),
1695            ),
1696            (
1697                JsonValue::String("false".to_string()),
1698                Some(JsonValue::Bool(false)),
1699            ),
1700        ];
1701
1702        for (input, expected) in tests {
1703            let result = convert_string_to_boolean(&input);
1704            assert_eq!(result, expected);
1705        }
1706    }
1707}