servers/http/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::extract::{Path, Query, State};
22use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
23use axum::response::IntoResponse;
24use chrono::Utc;
25use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::util;
30use common_telemetry::{debug, error, tracing, warn};
31use serde::{Deserialize, Deserializer, Serialize, de};
32use serde_json::Value as JsonValue;
33use session::context::{Channel, QueryContext};
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37    CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result, status_code_to_http_status,
38};
39use crate::http::HttpRecordsOutput;
40use crate::http::extractor::TraceTableName;
41use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
42use crate::otlp::trace::{
43    DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
44    KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
45    SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
46    SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
47    SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
48};
49use crate::query_handler::JaegerQueryHandlerRef;
50
51pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
52
53const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
54const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
55pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
56
57/// JaegerAPIResponse is the response of Jaeger HTTP API.
58/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
59#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
60pub struct JaegerAPIResponse {
61    pub data: Option<JaegerData>,
62    pub total: usize,
63    pub limit: usize,
64    pub offset: usize,
65    pub errors: Vec<JaegerAPIError>,
66}
67
68/// JaegerData is the query result of Jaeger HTTP API.
69#[derive(Debug, Serialize, Deserialize, PartialEq)]
70#[serde(untagged)]
71pub enum JaegerData {
72    ServiceNames(Vec<String>),
73    OperationsNames(Vec<String>),
74    Operations(Vec<Operation>),
75    Traces(Vec<Trace>),
76}
77
78/// JaegerAPIError is the error of Jaeger HTTP API.
79#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
80#[serde(rename_all = "camelCase")]
81pub struct JaegerAPIError {
82    pub code: i32,
83    pub msg: String,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub trace_id: Option<String>,
86}
87
88/// Operation is an operation in a service.
89#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
90#[serde(rename_all = "camelCase")]
91pub struct Operation {
92    pub name: String,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub span_kind: Option<String>,
95}
96
97/// Trace is a collection of spans.
98#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
99#[serde(rename_all = "camelCase")]
100pub struct Trace {
101    #[serde(rename = "traceID")]
102    pub trace_id: String,
103    pub spans: Vec<Span>,
104
105    #[serde(skip_serializing_if = "HashMap::is_empty")]
106    pub processes: HashMap<String, Process>,
107
108    #[serde(skip_serializing_if = "Vec::is_empty")]
109    pub warnings: Vec<String>,
110}
111
112/// Span is a single operation within a trace.
113#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct Span {
116    #[serde(rename = "traceID")]
117    pub trace_id: String,
118
119    #[serde(rename = "spanID")]
120    pub span_id: String,
121
122    #[serde(rename = "parentSpanID")]
123    #[serde(skip_serializing_if = "String::is_empty")]
124    pub parent_span_id: String,
125
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub flags: Option<u32>,
128
129    pub operation_name: String,
130    pub references: Vec<Reference>,
131    pub start_time: u64, // microseconds since unix epoch
132    pub duration: u64,   // microseconds
133    pub tags: Vec<KeyValue>,
134    pub logs: Vec<Log>,
135
136    #[serde(rename = "processID")]
137    #[serde(skip_serializing_if = "String::is_empty")]
138    pub process_id: String,
139
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub process: Option<Process>,
142
143    #[serde(skip_serializing_if = "Vec::is_empty")]
144    pub warnings: Vec<String>,
145}
146
147/// Reference is a reference from one span to another.
148#[derive(Debug, Serialize, Deserialize, PartialEq)]
149#[serde(rename_all = "camelCase")]
150pub struct Reference {
151    #[serde(rename = "traceID")]
152    pub trace_id: String,
153    #[serde(rename = "spanID")]
154    pub span_id: String,
155    pub ref_type: String,
156}
157
158/// Process is the process emitting a set of spans.
159#[derive(Debug, Serialize, Deserialize, PartialEq)]
160#[serde(rename_all = "camelCase")]
161pub struct Process {
162    pub service_name: String,
163    pub tags: Vec<KeyValue>,
164}
165
166/// Log is a log emitted in a span.
167#[derive(Debug, Serialize, Deserialize, PartialEq)]
168#[serde(rename_all = "camelCase")]
169pub struct Log {
170    pub timestamp: u64,
171    pub fields: Vec<KeyValue>,
172}
173
174/// KeyValue is a key-value pair with typed value.
175#[derive(Debug, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub struct KeyValue {
178    pub key: String,
179    #[serde(rename = "type")]
180    pub value_type: ValueType,
181    pub value: Value,
182}
183
184/// Value is the value of a key-value pair in Jaeger Span attributes.
185#[derive(Debug, Serialize, Deserialize, PartialEq)]
186#[serde(untagged)]
187#[serde(rename_all = "camelCase")]
188pub enum Value {
189    String(String),
190    Int64(i64),
191    Float64(f64),
192    Boolean(bool),
193    Binary(Vec<u8>),
194}
195
196/// ValueType is the type of a value stored in KeyValue struct.
197#[derive(Debug, Serialize, Deserialize, PartialEq)]
198#[serde(rename_all = "lowercase")]
199pub enum ValueType {
200    String,
201    Int64,
202    Float64,
203    Boolean,
204    Binary,
205}
206
207/// JaegerQueryParams is the query parameters of Jaeger HTTP API.
208#[derive(Default, Debug, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct JaegerQueryParams {
211    /// Service name of the trace.
212    #[serde(rename = "service")]
213    pub service_name: Option<String>,
214
215    /// Operation name of the trace.
216    #[serde(rename = "operation")]
217    pub operation_name: Option<String>,
218
219    /// Limit the return data.
220    #[serde(default, deserialize_with = "empty_string_as_none")]
221    pub limit: Option<usize>,
222
223    /// Start time of the trace in microseconds since unix epoch.
224    pub start: Option<i64>,
225
226    /// End time of the trace in microseconds since unix epoch.
227    pub end: Option<i64>,
228
229    /// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
230    #[serde(default, deserialize_with = "empty_string_as_none")]
231    pub max_duration: Option<String>,
232
233    /// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
234    #[serde(default, deserialize_with = "empty_string_as_none")]
235    pub min_duration: Option<String>,
236
237    /// Tags of the trace in JSON format. It will be URL encoded in the raw query.
238    /// The decoded format is like: tags="{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".
239    /// The key and value of the map are both strings. The key and value is the attribute name and value of the span. The value will be converted to the corresponding type when querying.
240    pub tags: Option<String>,
241
242    /// The span kind of the trace.
243    pub span_kind: Option<String>,
244}
245
246/// Serde deserialization decorator to map empty Strings to None.
247fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
248where
249    D: Deserializer<'de>,
250    T: FromStr,
251    T::Err: fmt::Display,
252{
253    let opt = Option::<String>::deserialize(de)?;
254    match opt.as_deref() {
255        None | Some("") => Ok(None),
256        Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
257    }
258}
259
260fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
261    // db should be already handled by middlewares
262    query_ctx.set_channel(Channel::Jaeger);
263    if let Some(table) = table_name {
264        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
265    }
266}
267
268impl QueryTraceParams {
269    fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
270        let mut internal_query_params: QueryTraceParams = QueryTraceParams {
271            service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
272                reason: "service_name is required".to_string(),
273            })?,
274            operation_name: query_params.operation_name,
275            // Convert start time from microseconds to nanoseconds.
276            start_time: query_params.start.map(|start| start * 1000),
277            end_time: query_params.end.map(|end| end * 1000),
278            ..Default::default()
279        };
280
281        if let Some(max_duration) = query_params.max_duration {
282            let duration = humantime::parse_duration(&max_duration).map_err(|e| {
283                InvalidJaegerQuerySnafu {
284                    reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
285                }
286                .build()
287            })?;
288            internal_query_params.max_duration = Some(duration.as_nanos() as u64);
289        }
290
291        if let Some(min_duration) = query_params.min_duration {
292            let duration = humantime::parse_duration(&min_duration).map_err(|e| {
293                InvalidJaegerQuerySnafu {
294                    reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
295                }
296                .build()
297            })?;
298            internal_query_params.min_duration = Some(duration.as_nanos() as u64);
299        }
300
301        if let Some(tags) = query_params.tags {
302            // Serialize the tags to a JSON map.
303            let mut tags_map: HashMap<String, JsonValue> =
304                serde_json::from_str(&tags).map_err(|e| {
305                    InvalidJaegerQuerySnafu {
306                        reason: format!("parse tags '{}' failed: {}", tags, e),
307                    }
308                    .build()
309                })?;
310            for (_, v) in tags_map.iter_mut() {
311                if let Some(number) = convert_string_to_number(v) {
312                    *v = number;
313                }
314                if let Some(boolean) = convert_string_to_boolean(v) {
315                    *v = boolean;
316                }
317            }
318            internal_query_params.tags = Some(tags_map);
319        }
320
321        internal_query_params.limit = query_params.limit;
322
323        Ok(internal_query_params)
324    }
325}
326
327#[derive(Debug, Default, PartialEq)]
328pub struct QueryTraceParams {
329    pub service_name: String,
330    pub operation_name: Option<String>,
331
332    // The limit of the number of traces to return.
333    pub limit: Option<usize>,
334
335    // Select the traces with the given tags(span attributes).
336    pub tags: Option<HashMap<String, JsonValue>>,
337
338    // The unit of the following time related parameters is nanoseconds.
339    pub start_time: Option<i64>,
340    pub end_time: Option<i64>,
341    pub min_duration: Option<u64>,
342    pub max_duration: Option<u64>,
343}
344
345/// Handle the GET `/api/services` request.
346#[axum_macros::debug_handler]
347#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
348pub async fn handle_get_services(
349    State(handler): State<JaegerQueryHandlerRef>,
350    Query(query_params): Query<JaegerQueryParams>,
351    Extension(mut query_ctx): Extension<QueryContext>,
352    TraceTableName(table_name): TraceTableName,
353) -> impl IntoResponse {
354    debug!(
355        "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
356        query_params, query_ctx
357    );
358
359    query_ctx.set_channel(Channel::Jaeger);
360    if let Some(table) = table_name {
361        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
362    }
363
364    let query_ctx = Arc::new(query_ctx);
365    let db = query_ctx.get_db_string();
366
367    // Record the query time histogram.
368    let _timer = METRIC_JAEGER_QUERY_ELAPSED
369        .with_label_values(&[&db, "/api/services"])
370        .start_timer();
371
372    match handler.get_services(query_ctx).await {
373        Ok(output) => match covert_to_records(output).await {
374            Ok(Some(records)) => match services_from_records(records) {
375                Ok(services) => {
376                    let services_num = services.len();
377                    (
378                        HttpStatusCode::OK,
379                        axum::Json(JaegerAPIResponse {
380                            data: Some(JaegerData::ServiceNames(services)),
381                            total: services_num,
382                            ..Default::default()
383                        }),
384                    )
385                }
386                Err(err) => {
387                    error!("Failed to get services: {:?}", err);
388                    error_response(err)
389                }
390            },
391            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
392            Err(err) => {
393                error!("Failed to get services: {:?}", err);
394                error_response(err)
395            }
396        },
397        Err(err) => handle_query_error(err, "Failed to get services", &db),
398    }
399}
400
401/// Handle the GET `/api/traces/{trace_id}` request.
402#[axum_macros::debug_handler]
403#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
404pub async fn handle_get_trace(
405    State(handler): State<JaegerQueryHandlerRef>,
406    Path(trace_id): Path<String>,
407    Query(query_params): Query<JaegerQueryParams>,
408    Extension(mut query_ctx): Extension<QueryContext>,
409    TraceTableName(table_name): TraceTableName,
410) -> impl IntoResponse {
411    debug!(
412        "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
413        trace_id, query_params, query_ctx
414    );
415
416    update_query_context(&mut query_ctx, table_name);
417    let query_ctx = Arc::new(query_ctx);
418    let db = query_ctx.get_db_string();
419
420    // Record the query time histogram.
421    let _timer = METRIC_JAEGER_QUERY_ELAPSED
422        .with_label_values(&[&db, "/api/traces"])
423        .start_timer();
424
425    // Convert start time and end time from microseconds to nanoseconds.
426    let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
427    let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
428
429    let output = match handler
430        .get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns)
431        .await
432    {
433        Ok(output) => output,
434        Err(err) => {
435            return handle_query_error(
436                err,
437                &format!("Failed to get trace for '{}'", trace_id),
438                &db,
439            );
440        }
441    };
442
443    match covert_to_records(output).await {
444        Ok(Some(records)) => match traces_from_records(records) {
445            Ok(traces) => (
446                HttpStatusCode::OK,
447                axum::Json(JaegerAPIResponse {
448                    data: Some(JaegerData::Traces(traces)),
449                    ..Default::default()
450                }),
451            ),
452            Err(err) => {
453                error!("Failed to get trace '{}': {:?}", trace_id, err);
454                error_response(err)
455            }
456        },
457        Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
458        Err(err) => {
459            error!("Failed to get trace '{}': {:?}", trace_id, err);
460            error_response(err)
461        }
462    }
463}
464
465/// Handle the GET `/api/traces` request.
466#[axum_macros::debug_handler]
467#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
468pub async fn handle_find_traces(
469    State(handler): State<JaegerQueryHandlerRef>,
470    Query(query_params): Query<JaegerQueryParams>,
471    Extension(mut query_ctx): Extension<QueryContext>,
472    TraceTableName(table_name): TraceTableName,
473) -> impl IntoResponse {
474    debug!(
475        "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
476        query_params, query_ctx
477    );
478
479    update_query_context(&mut query_ctx, table_name);
480    let query_ctx = Arc::new(query_ctx);
481    let db = query_ctx.get_db_string();
482
483    // Record the query time histogram.
484    let _timer = METRIC_JAEGER_QUERY_ELAPSED
485        .with_label_values(&[&db, "/api/traces"])
486        .start_timer();
487
488    match QueryTraceParams::from_jaeger_query_params(query_params) {
489        Ok(query_params) => {
490            let output = handler.find_traces(query_ctx, query_params).await;
491            match output {
492                Ok(output) => match covert_to_records(output).await {
493                    Ok(Some(records)) => match traces_from_records(records) {
494                        Ok(traces) => (
495                            HttpStatusCode::OK,
496                            axum::Json(JaegerAPIResponse {
497                                data: Some(JaegerData::Traces(traces)),
498                                ..Default::default()
499                            }),
500                        ),
501                        Err(err) => {
502                            error!("Failed to find traces: {:?}", err);
503                            error_response(err)
504                        }
505                    },
506                    Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
507                    Err(err) => error_response(err),
508                },
509                Err(err) => handle_query_error(err, "Failed to find traces", &db),
510            }
511        }
512        Err(e) => error_response(e),
513    }
514}
515
516/// Handle the GET `/api/operations` request.
517#[axum_macros::debug_handler]
518#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
519pub async fn handle_get_operations(
520    State(handler): State<JaegerQueryHandlerRef>,
521    Query(query_params): Query<JaegerQueryParams>,
522    Extension(mut query_ctx): Extension<QueryContext>,
523    TraceTableName(table_name): TraceTableName,
524    headers: HeaderMap,
525) -> impl IntoResponse {
526    debug!(
527        "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
528        query_params, query_ctx, headers
529    );
530
531    let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
532        Ok((start, end)) => (start, end),
533        Err(e) => return error_response(e),
534    };
535
536    debug!("Get operations with start: {:?}, end: {:?}", start, end);
537
538    if let Some(service_name) = &query_params.service_name {
539        update_query_context(&mut query_ctx, table_name);
540        let query_ctx = Arc::new(query_ctx);
541        let db = query_ctx.get_db_string();
542
543        // Record the query time histogram.
544        let _timer = METRIC_JAEGER_QUERY_ELAPSED
545            .with_label_values(&[&db, "/api/operations"])
546            .start_timer();
547
548        match handler
549            .get_operations(
550                query_ctx,
551                service_name,
552                query_params.span_kind.as_deref(),
553                start,
554                end,
555            )
556            .await
557        {
558            Ok(output) => match covert_to_records(output).await {
559                Ok(Some(records)) => match operations_from_records(records, true) {
560                    Ok(operations) => {
561                        let total = operations.len();
562                        (
563                            HttpStatusCode::OK,
564                            axum::Json(JaegerAPIResponse {
565                                data: Some(JaegerData::Operations(operations)),
566                                total,
567                                ..Default::default()
568                            }),
569                        )
570                    }
571                    Err(err) => {
572                        error!("Failed to get operations: {:?}", err);
573                        error_response(err)
574                    }
575                },
576                Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
577                Err(err) => error_response(err),
578            },
579            Err(err) => handle_query_error(
580                err,
581                &format!("Failed to get operations for service '{}'", service_name),
582                &db,
583            ),
584        }
585    } else {
586        (
587            HttpStatusCode::BAD_REQUEST,
588            axum::Json(JaegerAPIResponse {
589                errors: vec![JaegerAPIError {
590                    code: 400,
591                    msg: "parameter 'service' is required".to_string(),
592                    trace_id: None,
593                }],
594                ..Default::default()
595            }),
596        )
597    }
598}
599
600/// Handle the GET `/api/services/{service_name}/operations` request.
601#[axum_macros::debug_handler]
602#[tracing::instrument(
603    skip_all,
604    fields(protocol = "jaeger", request_type = "get_operations_by_service")
605)]
606pub async fn handle_get_operations_by_service(
607    State(handler): State<JaegerQueryHandlerRef>,
608    Path(service_name): Path<String>,
609    Query(query_params): Query<JaegerQueryParams>,
610    Extension(mut query_ctx): Extension<QueryContext>,
611    TraceTableName(table_name): TraceTableName,
612    headers: HeaderMap,
613) -> impl IntoResponse {
614    debug!(
615        "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
616        service_name, query_params, query_ctx, headers
617    );
618
619    update_query_context(&mut query_ctx, table_name);
620    let query_ctx = Arc::new(query_ctx);
621    let db = query_ctx.get_db_string();
622
623    // Record the query time histogram.
624    let _timer = METRIC_JAEGER_QUERY_ELAPSED
625        .with_label_values(&[&db, "/api/services"])
626        .start_timer();
627
628    let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
629        Ok((start, end)) => (start, end),
630        Err(e) => return error_response(e),
631    };
632
633    match handler
634        .get_operations(query_ctx, &service_name, None, start, end)
635        .await
636    {
637        Ok(output) => match covert_to_records(output).await {
638            Ok(Some(records)) => match operations_from_records(records, false) {
639                Ok(operations) => {
640                    let operations: Vec<String> =
641                        operations.into_iter().map(|op| op.name).collect();
642                    let total = operations.len();
643                    (
644                        HttpStatusCode::OK,
645                        axum::Json(JaegerAPIResponse {
646                            data: Some(JaegerData::OperationsNames(operations)),
647                            total,
648                            ..Default::default()
649                        }),
650                    )
651                }
652                Err(err) => {
653                    error!(
654                        "Failed to get operations for service '{}': {:?}",
655                        service_name, err
656                    );
657                    error_response(err)
658                }
659            },
660            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
661            Err(err) => error_response(err),
662        },
663        Err(err) => handle_query_error(
664            err,
665            &format!("Failed to get operations for service '{}'", service_name),
666            &db,
667        ),
668    }
669}
670
671async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
672    match output.data {
673        OutputData::Stream(stream) => {
674            let records = HttpRecordsOutput::try_new(
675                stream.schema().clone(),
676                util::collect(stream)
677                    .await
678                    .context(CollectRecordbatchSnafu)?,
679            )?;
680            debug!("The query records: {:?}", records);
681            Ok(Some(records))
682        }
683        // It's unlikely to happen. However, if the output is not a stream, return None.
684        _ => Ok(None),
685    }
686}
687
688fn handle_query_error(
689    err: Error,
690    prompt: &str,
691    db: &str,
692) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
693    // To compatible with the Jaeger API, if the trace table is not found, return an empty response instead of an error.
694    if err.status_code() == StatusCode::TableNotFound {
695        warn!(
696            "No trace table '{}' found in database '{}'",
697            TRACE_TABLE_NAME, db
698        );
699        (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
700    } else {
701        error!("{}: {:?}", prompt, err);
702        error_response(err)
703    }
704}
705
706fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
707    (
708        status_code_to_http_status(&err.status_code()),
709        axum::Json(JaegerAPIResponse {
710            errors: vec![JaegerAPIError {
711                code: err.status_code() as i32,
712                msg: err.to_string(),
713                ..Default::default()
714            }],
715            ..Default::default()
716        }),
717    )
718}
719
720fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
721    // maintain the mapping: trace_id -> (process_id -> service_name).
722    let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
723    // maintain the mapping: trace_id -> spans.
724    let mut trace_id_to_spans: HashMap<String, Vec<Span>> = HashMap::new();
725    // maintain the mapping: service.name -> resource.attributes.
726    let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
727
728    let is_span_attributes_flatten = !records
729        .schema
730        .column_schemas
731        .iter()
732        .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
733
734    for row in records.rows.into_iter() {
735        let mut span = Span::default();
736        let mut service_name = None;
737        let mut resource_tags = vec![];
738
739        for (idx, cell) in row.into_iter().enumerate() {
740            // safe to use index here
741            let column_name = &records.schema.column_schemas[idx].name;
742
743            match column_name.as_str() {
744                TRACE_ID_COLUMN => {
745                    if let JsonValue::String(trace_id) = cell {
746                        span.trace_id = trace_id.clone();
747                        trace_id_to_processes.entry(trace_id).or_default();
748                    }
749                }
750                TIMESTAMP_COLUMN => {
751                    span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
752                        reason: "Failed to convert timestamp to u64".to_string(),
753                    })? / 1000;
754                }
755                DURATION_NANO_COLUMN => {
756                    span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
757                        reason: "Failed to convert duration to u64".to_string(),
758                    })? / 1000;
759                }
760                SERVICE_NAME_COLUMN => {
761                    if let JsonValue::String(name) = cell {
762                        service_name = Some(name);
763                    }
764                }
765                SPAN_NAME_COLUMN => {
766                    if let JsonValue::String(span_name) = cell {
767                        span.operation_name = span_name;
768                    }
769                }
770                SPAN_ID_COLUMN => {
771                    if let JsonValue::String(span_id) = cell {
772                        span.span_id = span_id;
773                    }
774                }
775                SPAN_ATTRIBUTES_COLUMN => {
776                    // for v0 data model, span_attributes are nested as a json
777                    // data structure
778                    if let JsonValue::Object(span_attrs) = cell {
779                        span.tags.extend(object_to_tags(span_attrs));
780                    }
781                }
782                RESOURCE_ATTRIBUTES_COLUMN => {
783                    // for v0 data model, resource_attributes are nested as a json
784                    // data structure
785
786                    if let JsonValue::Object(mut resource_attrs) = cell {
787                        resource_attrs.remove(KEY_SERVICE_NAME);
788                        resource_tags = object_to_tags(resource_attrs);
789                    }
790                }
791                PARENT_SPAN_ID_COLUMN => {
792                    if let JsonValue::String(parent_span_id) = cell
793                        && !parent_span_id.is_empty()
794                    {
795                        span.references.push(Reference {
796                            trace_id: span.trace_id.clone(),
797                            span_id: parent_span_id,
798                            ref_type: REF_TYPE_CHILD_OF.to_string(),
799                        });
800                    }
801                }
802                SPAN_EVENTS_COLUMN => {
803                    if let JsonValue::Array(events) = cell {
804                        for event in events {
805                            if let JsonValue::Object(mut obj) = event {
806                                let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
807                                    continue;
808                                };
809
810                                let Some(t) =
811                                    obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
812                                        SPAN_KIND_TIME_FMTS
813                                            .iter()
814                                            .find_map(|fmt| {
815                                                chrono::DateTime::parse_from_str(s, fmt).ok()
816                                            })
817                                            .map(|dt| dt.timestamp_micros() as u64)
818                                    })
819                                else {
820                                    continue;
821                                };
822
823                                let mut fields = vec![KeyValue {
824                                    key: "event".to_string(),
825                                    value_type: ValueType::String,
826                                    value: Value::String(action.to_string()),
827                                }];
828
829                                // Add event attributes as fields
830                                if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
831                                    fields.extend(object_to_tags(attrs));
832                                }
833
834                                span.logs.push(Log {
835                                    timestamp: t,
836                                    fields,
837                                });
838                            }
839                        }
840                    }
841                }
842                SCOPE_NAME_COLUMN => {
843                    if let JsonValue::String(scope_name) = cell
844                        && !scope_name.is_empty()
845                    {
846                        span.tags.push(KeyValue {
847                            key: KEY_OTEL_SCOPE_NAME.to_string(),
848                            value_type: ValueType::String,
849                            value: Value::String(scope_name),
850                        });
851                    }
852                }
853                SCOPE_VERSION_COLUMN => {
854                    if let JsonValue::String(scope_version) = cell
855                        && !scope_version.is_empty()
856                    {
857                        span.tags.push(KeyValue {
858                            key: KEY_OTEL_SCOPE_VERSION.to_string(),
859                            value_type: ValueType::String,
860                            value: Value::String(scope_version),
861                        });
862                    }
863                }
864                SPAN_KIND_COLUMN => {
865                    if let JsonValue::String(span_kind) = cell
866                        && !span_kind.is_empty()
867                    {
868                        span.tags.push(KeyValue {
869                            key: KEY_SPAN_KIND.to_string(),
870                            value_type: ValueType::String,
871                            value: Value::String(normalize_span_kind(&span_kind)),
872                        });
873                    }
874                }
875                SPAN_STATUS_CODE => {
876                    if let JsonValue::String(span_status) = cell
877                        && span_status != SPAN_STATUS_UNSET
878                        && !span_status.is_empty()
879                    {
880                        span.tags.push(KeyValue {
881                            key: KEY_OTEL_STATUS_CODE.to_string(),
882                            value_type: ValueType::String,
883                            value: Value::String(normalize_status_code(&span_status)),
884                        });
885                    }
886                }
887
888                _ => {
889                    // this this v1 data model
890                    if is_span_attributes_flatten {
891                        const SPAN_ATTR_PREFIX: &str = "span_attributes.";
892                        const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
893                        // a span attributes column
894                        if column_name.starts_with(SPAN_ATTR_PREFIX) {
895                            if let Some(keyvalue) = to_keyvalue(
896                                column_name
897                                    .strip_prefix(SPAN_ATTR_PREFIX)
898                                    .unwrap_or_default()
899                                    .to_string(),
900                                cell,
901                            ) {
902                                span.tags.push(keyvalue);
903                            }
904                        } else if column_name.starts_with(RESOURCE_ATTR_PREFIX)
905                            && let Some(keyvalue) = to_keyvalue(
906                                column_name
907                                    .strip_prefix(RESOURCE_ATTR_PREFIX)
908                                    .unwrap_or_default()
909                                    .to_string(),
910                                cell,
911                            )
912                        {
913                            resource_tags.push(keyvalue);
914                        }
915                    }
916                }
917            }
918        }
919
920        if let Some(service_name) = service_name {
921            if !service_to_resource_attributes.contains_key(&service_name) {
922                service_to_resource_attributes.insert(service_name.clone(), resource_tags);
923            }
924
925            if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
926                if let Some(process_id) = process.get(&service_name) {
927                    span.process_id = process_id.clone();
928                } else {
929                    // Allocate a new process id.
930                    let process_id = format!("p{}", process.len() + 1);
931                    process.insert(service_name, process_id.clone());
932                    span.process_id = process_id;
933                }
934            }
935        }
936
937        // ensure span tags order
938        span.tags.sort_by(|a, b| a.key.cmp(&b.key));
939
940        if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
941            spans.push(span);
942        } else {
943            trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
944        }
945    }
946
947    let mut traces = Vec::new();
948    for (trace_id, spans) in trace_id_to_spans {
949        let mut trace = Trace {
950            trace_id,
951            spans,
952            ..Default::default()
953        };
954
955        if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
956            let mut process_id_to_process = HashMap::new();
957            for (service_name, process_id) in processes.into_iter() {
958                let tags = service_to_resource_attributes
959                    .remove(&service_name)
960                    .unwrap_or_default();
961                process_id_to_process.insert(process_id, Process { service_name, tags });
962            }
963            trace.processes = process_id_to_process;
964        }
965        traces.push(trace);
966    }
967
968    Ok(traces)
969}
970
971fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
972    match value {
973        JsonValue::String(value) => Some(KeyValue {
974            key,
975            value_type: ValueType::String,
976            value: Value::String(value.to_string()),
977        }),
978        JsonValue::Number(value) => Some(KeyValue {
979            key,
980            value_type: ValueType::Int64,
981            value: Value::Int64(value.as_i64().unwrap_or(0)),
982        }),
983        JsonValue::Bool(value) => Some(KeyValue {
984            key,
985            value_type: ValueType::Boolean,
986            value: Value::Boolean(value),
987        }),
988        JsonValue::Array(value) => Some(KeyValue {
989            key,
990            value_type: ValueType::String,
991            value: Value::String(serde_json::to_string(&value).unwrap()),
992        }),
993        JsonValue::Object(value) => Some(KeyValue {
994            key,
995            value_type: ValueType::String,
996            value: Value::String(serde_json::to_string(&value).unwrap()),
997        }),
998        JsonValue::Null => None,
999    }
1000}
1001
1002fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1003    object
1004        .into_iter()
1005        .filter_map(|(key, value)| to_keyvalue(key, value))
1006        .collect()
1007}
1008
1009fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1010    let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1011    check_schema(&records, &expected_schema)?;
1012
1013    let mut services = Vec::with_capacity(records.total_rows);
1014    for row in records.rows.into_iter() {
1015        for value in row.into_iter() {
1016            if let JsonValue::String(service_name) = value {
1017                services.push(service_name);
1018            }
1019        }
1020    }
1021    Ok(services)
1022}
1023
1024// Construct Jaeger operations from records.
1025fn operations_from_records(
1026    records: HttpRecordsOutput,
1027    contain_span_kind: bool,
1028) -> Result<Vec<Operation>> {
1029    let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1030    check_schema(&records, &expected_schema)?;
1031
1032    let mut operations = Vec::with_capacity(records.total_rows);
1033    for row in records.rows.into_iter() {
1034        let mut row_iter = row.into_iter();
1035        if let Some(JsonValue::String(operation)) = row_iter.next() {
1036            let mut operation = Operation {
1037                name: operation,
1038                span_kind: None,
1039            };
1040            if contain_span_kind {
1041                if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1042                    operation.span_kind = Some(normalize_span_kind(&span_kind));
1043                }
1044            } else {
1045                // skip span kind.
1046                row_iter.next();
1047            }
1048            operations.push(operation);
1049        }
1050    }
1051
1052    Ok(operations)
1053}
1054
1055// Check whether the schema of the records is correct.
1056fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1057    for (i, column) in records.schema.column_schemas.iter().enumerate() {
1058        if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1059            InvalidJaegerQuerySnafu {
1060                reason: "query result schema is not correct".to_string(),
1061            }
1062            .fail()?
1063        }
1064    }
1065    Ok(())
1066}
1067
1068// By default, the span kind is stored as `SPAN_KIND_<kind>` in GreptimeDB.
1069// However, in Jaeger API, the span kind is returned as `<kind>` which is the lowercase of the span kind and without the `SPAN_KIND_` prefix.
1070fn normalize_span_kind(span_kind: &str) -> String {
1071    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1072    if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1073        stripped.to_lowercase()
1074    } else {
1075        // It's unlikely to happen. However, we still convert it to lowercase for consistency.
1076        span_kind.to_lowercase()
1077    }
1078}
1079
1080// By default, the status code is stored as `STATUS_CODE_<code>` in GreptimeDB.
1081// However, in Jaeger API, the status code is returned as `<code>` without the `STATUS_CODE_` prefix.
1082fn normalize_status_code(status_code: &str) -> String {
1083    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1084    if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1085        stripped.to_string()
1086    } else {
1087        // It's unlikely to happen
1088        status_code.to_string()
1089    }
1090}
1091
1092fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1093    if let Some(data) = input.as_str() {
1094        if let Ok(number) = data.parse::<i64>() {
1095            return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1096        }
1097        if let Ok(number) = data.parse::<f64>()
1098            && let Some(number) = serde_json::Number::from_f64(number)
1099        {
1100            return Some(serde_json::Value::Number(number));
1101        }
1102    }
1103
1104    None
1105}
1106
1107fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1108    if let Some(data) = input.as_str() {
1109        if data == "true" {
1110            return Some(serde_json::Value::Bool(true));
1111        }
1112        if data == "false" {
1113            return Some(serde_json::Value::Bool(false));
1114        }
1115    }
1116
1117    None
1118}
1119
1120fn parse_jaeger_time_range_for_operations(
1121    headers: &HeaderMap,
1122    query_params: &JaegerQueryParams,
1123) -> Result<(Option<i64>, Option<i64>)> {
1124    if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
1125        match time_range.to_str() {
1126            Ok(time_range) => match humantime::parse_duration(time_range) {
1127                Ok(duration) => {
1128                    debug!(
1129                        "Get operations with time range: {:?}, duration: {:?}",
1130                        time_range, duration
1131                    );
1132                    let now = Utc::now().timestamp_micros();
1133                    Ok((Some(now - duration.as_micros() as i64), Some(now)))
1134                }
1135                Err(e) => {
1136                    error!("Failed to parse time range header: {:?}", e);
1137                    Err(InvalidJaegerQuerySnafu {
1138                        reason: format!("invalid time range header: {:?}", time_range),
1139                    }
1140                    .build())
1141                }
1142            },
1143            Err(e) => {
1144                error!("Failed to convert time range header to string: {:?}", e);
1145                Err(InvalidJaegerQuerySnafu {
1146                    reason: format!("invalid time range header: {:?}", time_range),
1147                }
1148                .build())
1149            }
1150        }
1151    } else {
1152        Ok((query_params.start, query_params.end))
1153    }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158    use serde_json::{Number, Value as JsonValue, json};
1159
1160    use super::*;
1161    use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1162
1163    #[test]
1164    fn test_services_from_records() {
1165        // The tests is the tuple of `(test_records, expected)`.
1166        let tests = vec![(
1167            HttpRecordsOutput {
1168                schema: OutputSchema {
1169                    column_schemas: vec![ColumnSchema {
1170                        name: "service_name".to_string(),
1171                        data_type: "String".to_string(),
1172                    }],
1173                },
1174                rows: vec![
1175                    vec![JsonValue::String("test-service-0".to_string())],
1176                    vec![JsonValue::String("test-service-1".to_string())],
1177                ],
1178                total_rows: 2,
1179                metrics: HashMap::new(),
1180            },
1181            vec!["test-service-0".to_string(), "test-service-1".to_string()],
1182        )];
1183
1184        for (records, expected) in tests {
1185            let services = services_from_records(records).unwrap();
1186            assert_eq!(services, expected);
1187        }
1188    }
1189
1190    #[test]
1191    fn test_operations_from_records() {
1192        // The tests is the tuple of `(test_records, contain_span_kind, expected)`.
1193        let tests = vec![
1194            (
1195                HttpRecordsOutput {
1196                    schema: OutputSchema {
1197                        column_schemas: vec![
1198                            ColumnSchema {
1199                                name: "span_name".to_string(),
1200                                data_type: "String".to_string(),
1201                            },
1202                            ColumnSchema {
1203                                name: "span_kind".to_string(),
1204                                data_type: "String".to_string(),
1205                            },
1206                        ],
1207                    },
1208                    rows: vec![
1209                        vec![
1210                            JsonValue::String("access-mysql".to_string()),
1211                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1212                        ],
1213                        vec![
1214                            JsonValue::String("access-redis".to_string()),
1215                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1216                        ],
1217                    ],
1218                    total_rows: 2,
1219                    metrics: HashMap::new(),
1220                },
1221                false,
1222                vec![
1223                    Operation {
1224                        name: "access-mysql".to_string(),
1225                        span_kind: None,
1226                    },
1227                    Operation {
1228                        name: "access-redis".to_string(),
1229                        span_kind: None,
1230                    },
1231                ],
1232            ),
1233            (
1234                HttpRecordsOutput {
1235                    schema: OutputSchema {
1236                        column_schemas: vec![
1237                            ColumnSchema {
1238                                name: "span_name".to_string(),
1239                                data_type: "String".to_string(),
1240                            },
1241                            ColumnSchema {
1242                                name: "span_kind".to_string(),
1243                                data_type: "String".to_string(),
1244                            },
1245                        ],
1246                    },
1247                    rows: vec![
1248                        vec![
1249                            JsonValue::String("access-mysql".to_string()),
1250                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1251                        ],
1252                        vec![
1253                            JsonValue::String("access-redis".to_string()),
1254                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1255                        ],
1256                    ],
1257                    total_rows: 2,
1258                    metrics: HashMap::new(),
1259                },
1260                true,
1261                vec![
1262                    Operation {
1263                        name: "access-mysql".to_string(),
1264                        span_kind: Some("server".to_string()),
1265                    },
1266                    Operation {
1267                        name: "access-redis".to_string(),
1268                        span_kind: Some("client".to_string()),
1269                    },
1270                ],
1271            ),
1272        ];
1273
1274        for (records, contain_span_kind, expected) in tests {
1275            let operations = operations_from_records(records, contain_span_kind).unwrap();
1276            assert_eq!(operations, expected);
1277        }
1278    }
1279
1280    #[test]
1281    fn test_traces_from_records() {
1282        // The tests is the tuple of `(test_records, expected)`.
1283        let tests = vec![(
1284            HttpRecordsOutput {
1285                schema: OutputSchema {
1286                    column_schemas: vec![
1287                        ColumnSchema {
1288                            name: "trace_id".to_string(),
1289                            data_type: "String".to_string(),
1290                        },
1291                        ColumnSchema {
1292                            name: "timestamp".to_string(),
1293                            data_type: "TimestampNanosecond".to_string(),
1294                        },
1295                        ColumnSchema {
1296                            name: "duration_nano".to_string(),
1297                            data_type: "UInt64".to_string(),
1298                        },
1299                        ColumnSchema {
1300                            name: "service_name".to_string(),
1301                            data_type: "String".to_string(),
1302                        },
1303                        ColumnSchema {
1304                            name: "span_name".to_string(),
1305                            data_type: "String".to_string(),
1306                        },
1307                        ColumnSchema {
1308                            name: "span_id".to_string(),
1309                            data_type: "String".to_string(),
1310                        },
1311                        ColumnSchema {
1312                            name: "span_attributes".to_string(),
1313                            data_type: "Json".to_string(),
1314                        },
1315                    ],
1316                },
1317                rows: vec![
1318                    vec![
1319                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1320                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1321                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1322                        JsonValue::String("test-service-0".to_string()),
1323                        JsonValue::String("access-mysql".to_string()),
1324                        JsonValue::String("008421dbbd33a3e9".to_string()),
1325                        JsonValue::Object(
1326                            json!({
1327                                "operation.type": "access-mysql",
1328                            })
1329                            .as_object()
1330                            .unwrap()
1331                            .clone(),
1332                        ),
1333                    ],
1334                    vec![
1335                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1336                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1337                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1338                        JsonValue::String("test-service-0".to_string()),
1339                        JsonValue::String("access-redis".to_string()),
1340                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1341                        JsonValue::Object(
1342                            json!({
1343                                "operation.type": "access-redis",
1344                            })
1345                            .as_object()
1346                            .unwrap()
1347                            .clone(),
1348                        ),
1349                    ],
1350                ],
1351                total_rows: 2,
1352                metrics: HashMap::new(),
1353            },
1354            vec![Trace {
1355                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1356                spans: vec![
1357                    Span {
1358                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1359                        span_id: "008421dbbd33a3e9".to_string(),
1360                        operation_name: "access-mysql".to_string(),
1361                        start_time: 1738726754492422,
1362                        duration: 100000,
1363                        tags: vec![KeyValue {
1364                            key: "operation.type".to_string(),
1365                            value_type: ValueType::String,
1366                            value: Value::String("access-mysql".to_string()),
1367                        }],
1368                        process_id: "p1".to_string(),
1369                        ..Default::default()
1370                    },
1371                    Span {
1372                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1373                        span_id: "ffa03416a7b9ea48".to_string(),
1374                        operation_name: "access-redis".to_string(),
1375                        start_time: 1738726754642422,
1376                        duration: 100000,
1377                        tags: vec![KeyValue {
1378                            key: "operation.type".to_string(),
1379                            value_type: ValueType::String,
1380                            value: Value::String("access-redis".to_string()),
1381                        }],
1382                        process_id: "p1".to_string(),
1383                        ..Default::default()
1384                    },
1385                ],
1386                processes: HashMap::from([(
1387                    "p1".to_string(),
1388                    Process {
1389                        service_name: "test-service-0".to_string(),
1390                        tags: vec![],
1391                    },
1392                )]),
1393                ..Default::default()
1394            }],
1395        )];
1396
1397        for (records, expected) in tests {
1398            let traces = traces_from_records(records).unwrap();
1399            assert_eq!(traces, expected);
1400        }
1401    }
1402
1403    #[test]
1404    fn test_traces_from_v1_records() {
1405        // The tests is the tuple of `(test_records, expected)`.
1406        let tests = vec![(
1407            HttpRecordsOutput {
1408                schema: OutputSchema {
1409                    column_schemas: vec![
1410                        ColumnSchema {
1411                            name: "trace_id".to_string(),
1412                            data_type: "String".to_string(),
1413                        },
1414                        ColumnSchema {
1415                            name: "timestamp".to_string(),
1416                            data_type: "TimestampNanosecond".to_string(),
1417                        },
1418                        ColumnSchema {
1419                            name: "duration_nano".to_string(),
1420                            data_type: "UInt64".to_string(),
1421                        },
1422                        ColumnSchema {
1423                            name: "service_name".to_string(),
1424                            data_type: "String".to_string(),
1425                        },
1426                        ColumnSchema {
1427                            name: "span_name".to_string(),
1428                            data_type: "String".to_string(),
1429                        },
1430                        ColumnSchema {
1431                            name: "span_id".to_string(),
1432                            data_type: "String".to_string(),
1433                        },
1434                        ColumnSchema {
1435                            name: "span_attributes.http.request.method".to_string(),
1436                            data_type: "String".to_string(),
1437                        },
1438                        ColumnSchema {
1439                            name: "span_attributes.http.request.url".to_string(),
1440                            data_type: "String".to_string(),
1441                        },
1442                        ColumnSchema {
1443                            name: "span_attributes.http.status_code".to_string(),
1444                            data_type: "UInt64".to_string(),
1445                        },
1446                    ],
1447                },
1448                rows: vec![
1449                    vec![
1450                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1451                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1452                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1453                        JsonValue::String("test-service-0".to_string()),
1454                        JsonValue::String("access-mysql".to_string()),
1455                        JsonValue::String("008421dbbd33a3e9".to_string()),
1456                        JsonValue::String("GET".to_string()),
1457                        JsonValue::String("/data".to_string()),
1458                        JsonValue::Number(Number::from_u128(200).unwrap()),
1459                    ],
1460                    vec![
1461                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1462                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1463                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1464                        JsonValue::String("test-service-0".to_string()),
1465                        JsonValue::String("access-redis".to_string()),
1466                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1467                        JsonValue::String("POST".to_string()),
1468                        JsonValue::String("/create".to_string()),
1469                        JsonValue::Number(Number::from_u128(400).unwrap()),
1470                    ],
1471                ],
1472                total_rows: 2,
1473                metrics: HashMap::new(),
1474            },
1475            vec![Trace {
1476                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1477                spans: vec![
1478                    Span {
1479                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1480                        span_id: "008421dbbd33a3e9".to_string(),
1481                        operation_name: "access-mysql".to_string(),
1482                        start_time: 1738726754492422,
1483                        duration: 100000,
1484                        tags: vec![
1485                            KeyValue {
1486                                key: "http.request.method".to_string(),
1487                                value_type: ValueType::String,
1488                                value: Value::String("GET".to_string()),
1489                            },
1490                            KeyValue {
1491                                key: "http.request.url".to_string(),
1492                                value_type: ValueType::String,
1493                                value: Value::String("/data".to_string()),
1494                            },
1495                            KeyValue {
1496                                key: "http.status_code".to_string(),
1497                                value_type: ValueType::Int64,
1498                                value: Value::Int64(200),
1499                            },
1500                        ],
1501                        process_id: "p1".to_string(),
1502                        ..Default::default()
1503                    },
1504                    Span {
1505                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1506                        span_id: "ffa03416a7b9ea48".to_string(),
1507                        operation_name: "access-redis".to_string(),
1508                        start_time: 1738726754642422,
1509                        duration: 100000,
1510                        tags: vec![
1511                            KeyValue {
1512                                key: "http.request.method".to_string(),
1513                                value_type: ValueType::String,
1514                                value: Value::String("POST".to_string()),
1515                            },
1516                            KeyValue {
1517                                key: "http.request.url".to_string(),
1518                                value_type: ValueType::String,
1519                                value: Value::String("/create".to_string()),
1520                            },
1521                            KeyValue {
1522                                key: "http.status_code".to_string(),
1523                                value_type: ValueType::Int64,
1524                                value: Value::Int64(400),
1525                            },
1526                        ],
1527                        process_id: "p1".to_string(),
1528                        ..Default::default()
1529                    },
1530                ],
1531                processes: HashMap::from([(
1532                    "p1".to_string(),
1533                    Process {
1534                        service_name: "test-service-0".to_string(),
1535                        tags: vec![],
1536                    },
1537                )]),
1538                ..Default::default()
1539            }],
1540        )];
1541
1542        for (records, expected) in tests {
1543            let traces = traces_from_records(records).unwrap();
1544            assert_eq!(traces, expected);
1545        }
1546    }
1547
1548    #[test]
1549    fn test_from_jaeger_query_params() {
1550        // The tests is the tuple of `(test_query_params, expected)`.
1551        let tests = vec![
1552            (
1553                JaegerQueryParams {
1554                    service_name: Some("test-service-0".to_string()),
1555                    ..Default::default()
1556                },
1557                QueryTraceParams {
1558                    service_name: "test-service-0".to_string(),
1559                    ..Default::default()
1560                },
1561            ),
1562            (
1563                JaegerQueryParams {
1564                    service_name: Some("test-service-0".to_string()),
1565                    operation_name: Some("access-mysql".to_string()),
1566                    start: Some(1738726754492422),
1567                    end: Some(1738726754642422),
1568                    max_duration: Some("100ms".to_string()),
1569                    min_duration: Some("50ms".to_string()),
1570                    limit: Some(10),
1571                    tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1572                    ..Default::default()
1573                },
1574                QueryTraceParams {
1575                    service_name: "test-service-0".to_string(),
1576                    operation_name: Some("access-mysql".to_string()),
1577                    start_time: Some(1738726754492422000),
1578                    end_time: Some(1738726754642422000),
1579                    min_duration: Some(50000000),
1580                    max_duration: Some(100000000),
1581                    limit: Some(10),
1582                    tags: Some(HashMap::from([
1583                        ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1584                        ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1585                        ("error".to_string(), JsonValue::Bool(false)),
1586                        ("http.method".to_string(), JsonValue::String("GET".to_string())),
1587                        ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1588                    ])),
1589                },
1590            ),
1591        ];
1592
1593        for (query_params, expected) in tests {
1594            let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1595            assert_eq!(query_params, expected);
1596        }
1597    }
1598
1599    #[test]
1600    fn test_check_schema() {
1601        // The tests is the tuple of `(test_records, expected_schema, is_ok)`.
1602        let tests = vec![(
1603            HttpRecordsOutput {
1604                schema: OutputSchema {
1605                    column_schemas: vec![
1606                        ColumnSchema {
1607                            name: "trace_id".to_string(),
1608                            data_type: "String".to_string(),
1609                        },
1610                        ColumnSchema {
1611                            name: "timestamp".to_string(),
1612                            data_type: "TimestampNanosecond".to_string(),
1613                        },
1614                        ColumnSchema {
1615                            name: "duration_nano".to_string(),
1616                            data_type: "UInt64".to_string(),
1617                        },
1618                        ColumnSchema {
1619                            name: "service_name".to_string(),
1620                            data_type: "String".to_string(),
1621                        },
1622                        ColumnSchema {
1623                            name: "span_name".to_string(),
1624                            data_type: "String".to_string(),
1625                        },
1626                        ColumnSchema {
1627                            name: "span_id".to_string(),
1628                            data_type: "String".to_string(),
1629                        },
1630                        ColumnSchema {
1631                            name: "span_attributes".to_string(),
1632                            data_type: "Json".to_string(),
1633                        },
1634                    ],
1635                },
1636                rows: vec![],
1637                total_rows: 0,
1638                metrics: HashMap::new(),
1639            },
1640            vec![
1641                (TRACE_ID_COLUMN, "String"),
1642                (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1643                (DURATION_NANO_COLUMN, "UInt64"),
1644                (SERVICE_NAME_COLUMN, "String"),
1645                (SPAN_NAME_COLUMN, "String"),
1646                (SPAN_ID_COLUMN, "String"),
1647                (SPAN_ATTRIBUTES_COLUMN, "Json"),
1648            ],
1649            true,
1650        )];
1651
1652        for (records, expected_schema, is_ok) in tests {
1653            let result = check_schema(&records, &expected_schema);
1654            assert_eq!(result.is_ok(), is_ok);
1655        }
1656    }
1657
1658    #[test]
1659    fn test_normalize_span_kind() {
1660        let tests = vec![
1661            ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1662            ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1663        ];
1664
1665        for (input, expected) in tests {
1666            let result = normalize_span_kind(&input);
1667            assert_eq!(result, expected);
1668        }
1669    }
1670
1671    #[test]
1672    fn test_convert_string_to_number() {
1673        let tests = vec![
1674            (
1675                JsonValue::String("123".to_string()),
1676                Some(JsonValue::Number(Number::from(123))),
1677            ),
1678            (
1679                JsonValue::String("123.456".to_string()),
1680                Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1681            ),
1682        ];
1683
1684        for (input, expected) in tests {
1685            let result = convert_string_to_number(&input);
1686            assert_eq!(result, expected);
1687        }
1688    }
1689
1690    #[test]
1691    fn test_convert_string_to_boolean() {
1692        let tests = vec![
1693            (
1694                JsonValue::String("true".to_string()),
1695                Some(JsonValue::Bool(true)),
1696            ),
1697            (
1698                JsonValue::String("false".to_string()),
1699                Some(JsonValue::Bool(false)),
1700            ),
1701        ];
1702
1703        for (input, expected) in tests {
1704            let result = convert_string_to_boolean(&input);
1705            assert_eq!(result, expected);
1706        }
1707    }
1708}