servers/http/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::extract::{Path, Query, State};
22use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
23use axum::response::IntoResponse;
24use axum_extra::TypedHeader;
25use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::util;
30use common_telemetry::{debug, error, tracing, warn};
31use headers::UserAgent;
32use serde::{Deserialize, Deserializer, Serialize, de};
33use serde_json::Value as JsonValue;
34use session::context::{Channel, QueryContext};
35use snafu::{OptionExt, ResultExt};
36
37use crate::error::{
38    CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result, status_code_to_http_status,
39};
40use crate::http::HttpRecordsOutput;
41use crate::http::extractor::TraceTableName;
42use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
43use crate::otlp::trace::{
44    DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
45    KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME,
46    KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN,
47    SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
48    SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
49    SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN,
50    TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
51};
52use crate::query_handler::JaegerQueryHandlerRef;
53
54pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
55
56const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
57const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
58
59const TRACE_NOT_FOUND_ERROR_CODE: i32 = 404;
60const TRACE_NOT_FOUND_ERROR_MSG: &str = "trace not found";
61
62/// JaegerAPIResponse is the response of Jaeger HTTP API.
63/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
64#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
65pub struct JaegerAPIResponse {
66    pub data: Option<JaegerData>,
67    pub total: usize,
68    pub limit: usize,
69    pub offset: usize,
70    pub errors: Vec<JaegerAPIError>,
71}
72
73impl JaegerAPIResponse {
74    pub fn trace_not_found() -> Self {
75        Self {
76            data: None,
77            total: 0,
78            limit: 0,
79            offset: 0,
80            errors: vec![JaegerAPIError {
81                code: TRACE_NOT_FOUND_ERROR_CODE,
82                msg: TRACE_NOT_FOUND_ERROR_MSG.to_string(),
83                trace_id: None,
84            }],
85        }
86    }
87}
88
89/// JaegerData is the query result of Jaeger HTTP API.
90#[derive(Debug, Serialize, Deserialize, PartialEq)]
91#[serde(untagged)]
92pub enum JaegerData {
93    ServiceNames(Vec<String>),
94    OperationsNames(Vec<String>),
95    Operations(Vec<Operation>),
96    Traces(Vec<Trace>),
97}
98
99/// JaegerAPIError is the error of Jaeger HTTP API.
100#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
101#[serde(rename_all = "camelCase")]
102pub struct JaegerAPIError {
103    pub code: i32,
104    pub msg: String,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub trace_id: Option<String>,
107}
108
109/// Operation is an operation in a service.
110#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
111#[serde(rename_all = "camelCase")]
112pub struct Operation {
113    pub name: String,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub span_kind: Option<String>,
116}
117
118/// Trace is a collection of spans.
119#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
120#[serde(rename_all = "camelCase")]
121pub struct Trace {
122    #[serde(rename = "traceID")]
123    pub trace_id: String,
124    pub spans: Vec<Span>,
125
126    #[serde(skip_serializing_if = "HashMap::is_empty")]
127    pub processes: HashMap<String, Process>,
128
129    #[serde(skip_serializing_if = "Vec::is_empty")]
130    pub warnings: Vec<String>,
131}
132
133/// Span is a single operation within a trace.
134#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
135#[serde(rename_all = "camelCase")]
136pub struct Span {
137    #[serde(rename = "traceID")]
138    pub trace_id: String,
139
140    #[serde(rename = "spanID")]
141    pub span_id: String,
142
143    #[serde(rename = "parentSpanID")]
144    #[serde(skip_serializing_if = "String::is_empty")]
145    pub parent_span_id: String,
146
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub flags: Option<u32>,
149
150    pub operation_name: String,
151    pub references: Vec<Reference>,
152    pub start_time: u64, // microseconds since unix epoch
153    pub duration: u64,   // microseconds
154    pub tags: Vec<KeyValue>,
155    pub logs: Vec<Log>,
156
157    #[serde(rename = "processID")]
158    #[serde(skip_serializing_if = "String::is_empty")]
159    pub process_id: String,
160
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub process: Option<Process>,
163
164    #[serde(skip_serializing_if = "Vec::is_empty")]
165    pub warnings: Vec<String>,
166}
167
168/// Reference is a reference from one span to another.
169#[derive(Debug, Serialize, Deserialize, PartialEq)]
170#[serde(rename_all = "camelCase")]
171pub struct Reference {
172    #[serde(rename = "traceID")]
173    pub trace_id: String,
174    #[serde(rename = "spanID")]
175    pub span_id: String,
176    pub ref_type: String,
177}
178
179/// Process is the process emitting a set of spans.
180#[derive(Debug, Serialize, Deserialize, PartialEq)]
181#[serde(rename_all = "camelCase")]
182pub struct Process {
183    pub service_name: String,
184    pub tags: Vec<KeyValue>,
185}
186
187/// Log is a log emitted in a span.
188#[derive(Debug, Serialize, Deserialize, PartialEq)]
189#[serde(rename_all = "camelCase")]
190pub struct Log {
191    pub timestamp: u64,
192    pub fields: Vec<KeyValue>,
193}
194
195/// KeyValue is a key-value pair with typed value.
196#[derive(Debug, Serialize, Deserialize, PartialEq)]
197#[serde(rename_all = "camelCase")]
198pub struct KeyValue {
199    pub key: String,
200    #[serde(rename = "type")]
201    pub value_type: ValueType,
202    pub value: Value,
203}
204
205/// Value is the value of a key-value pair in Jaeger Span attributes.
206#[derive(Debug, Serialize, Deserialize, PartialEq)]
207#[serde(untagged)]
208#[serde(rename_all = "camelCase")]
209pub enum Value {
210    String(String),
211    Int64(i64),
212    Float64(f64),
213    Boolean(bool),
214    Binary(Vec<u8>),
215}
216
217/// ValueType is the type of a value stored in KeyValue struct.
218#[derive(Debug, Serialize, Deserialize, PartialEq)]
219#[serde(rename_all = "lowercase")]
220pub enum ValueType {
221    String,
222    Int64,
223    Float64,
224    Boolean,
225    Binary,
226}
227
228/// JaegerQueryParams is the query parameters of Jaeger HTTP API.
229#[derive(Default, Debug, Serialize, Deserialize)]
230#[serde(rename_all = "camelCase")]
231pub struct JaegerQueryParams {
232    /// Service name of the trace.
233    #[serde(rename = "service")]
234    pub service_name: Option<String>,
235
236    /// Operation name of the trace.
237    #[serde(rename = "operation")]
238    pub operation_name: Option<String>,
239
240    /// Limit the return data.
241    #[serde(default, deserialize_with = "empty_string_as_none")]
242    pub limit: Option<usize>,
243
244    /// Start time of the trace in microseconds since unix epoch.
245    pub start: Option<i64>,
246
247    /// End time of the trace in microseconds since unix epoch.
248    pub end: Option<i64>,
249
250    /// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
251    #[serde(default, deserialize_with = "empty_string_as_none")]
252    pub max_duration: Option<String>,
253
254    /// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
255    #[serde(default, deserialize_with = "empty_string_as_none")]
256    pub min_duration: Option<String>,
257
258    /// Tags of the trace in JSON format. It will be URL encoded in the raw query.
259    /// The decoded format is like: tags="{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".
260    /// The key and value of the map are both strings. The key and value is the attribute name and value of the span. The value will be converted to the corresponding type when querying.
261    pub tags: Option<String>,
262
263    /// The span kind of the trace.
264    pub span_kind: Option<String>,
265}
266
267/// Serde deserialization decorator to map empty Strings to None.
268fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
269where
270    D: Deserializer<'de>,
271    T: FromStr,
272    T::Err: fmt::Display,
273{
274    let opt = Option::<String>::deserialize(de)?;
275    match opt.as_deref() {
276        None | Some("") => Ok(None),
277        Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
278    }
279}
280
281fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
282    // db should be already handled by middlewares
283    query_ctx.set_channel(Channel::Jaeger);
284    if let Some(table) = table_name {
285        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
286    }
287}
288
289impl QueryTraceParams {
290    fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
291        let mut internal_query_params: QueryTraceParams = QueryTraceParams {
292            service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
293                reason: "service_name is required".to_string(),
294            })?,
295            operation_name: query_params.operation_name,
296            // Convert start time from microseconds to nanoseconds.
297            start_time: query_params.start.map(|start| start * 1000),
298            end_time: query_params.end.map(|end| end * 1000),
299            ..Default::default()
300        };
301
302        if let Some(max_duration) = query_params.max_duration {
303            let duration = humantime::parse_duration(&max_duration).map_err(|e| {
304                InvalidJaegerQuerySnafu {
305                    reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
306                }
307                .build()
308            })?;
309            internal_query_params.max_duration = Some(duration.as_nanos() as u64);
310        }
311
312        if let Some(min_duration) = query_params.min_duration {
313            let duration = humantime::parse_duration(&min_duration).map_err(|e| {
314                InvalidJaegerQuerySnafu {
315                    reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
316                }
317                .build()
318            })?;
319            internal_query_params.min_duration = Some(duration.as_nanos() as u64);
320        }
321
322        if let Some(tags) = query_params.tags {
323            // Serialize the tags to a JSON map.
324            let mut tags_map: HashMap<String, JsonValue> =
325                serde_json::from_str(&tags).map_err(|e| {
326                    InvalidJaegerQuerySnafu {
327                        reason: format!("parse tags '{}' failed: {}", tags, e),
328                    }
329                    .build()
330                })?;
331            for (_, v) in tags_map.iter_mut() {
332                if let Some(number) = convert_string_to_number(v) {
333                    *v = number;
334                }
335                if let Some(boolean) = convert_string_to_boolean(v) {
336                    *v = boolean;
337                }
338            }
339            internal_query_params.tags = Some(tags_map);
340        }
341
342        internal_query_params.limit = query_params.limit;
343
344        Ok(internal_query_params)
345    }
346}
347
348#[derive(Debug, Default, PartialEq)]
349pub struct QueryTraceParams {
350    pub service_name: String,
351    pub operation_name: Option<String>,
352
353    // The limit of the number of traces to return.
354    pub limit: Option<usize>,
355
356    // Select the traces with the given tags(span attributes).
357    pub tags: Option<HashMap<String, JsonValue>>,
358
359    // The unit of the following time related parameters is nanoseconds.
360    pub start_time: Option<i64>,
361    pub end_time: Option<i64>,
362    pub min_duration: Option<u64>,
363    pub max_duration: Option<u64>,
364
365    // The user agent of the trace query, mainly find traces
366    pub user_agent: TraceUserAgent,
367}
368
369#[derive(Debug, Default, PartialEq, Eq)]
370pub enum TraceUserAgent {
371    Grafana,
372    // Jaeger-UI does not actually send user agent
373    // But it's a jaeger API, so let's treat it as jaeger
374    #[default]
375    Jaeger,
376}
377
378impl From<UserAgent> for TraceUserAgent {
379    fn from(value: UserAgent) -> Self {
380        let ua_str = value.as_str().to_lowercase();
381        debug!("received user agent: {}", ua_str);
382        if ua_str.contains("grafana") {
383            Self::Grafana
384        } else {
385            Self::Jaeger
386        }
387    }
388}
389
390/// Handle the GET `/api/services` request.
391#[axum_macros::debug_handler]
392#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
393pub async fn handle_get_services(
394    State(handler): State<JaegerQueryHandlerRef>,
395    Query(query_params): Query<JaegerQueryParams>,
396    Extension(mut query_ctx): Extension<QueryContext>,
397    TraceTableName(table_name): TraceTableName,
398) -> impl IntoResponse {
399    debug!(
400        "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
401        query_params, query_ctx
402    );
403
404    query_ctx.set_channel(Channel::Jaeger);
405    if let Some(table) = table_name {
406        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
407    }
408
409    let query_ctx = Arc::new(query_ctx);
410    let db = query_ctx.get_db_string();
411
412    // Record the query time histogram.
413    let _timer = METRIC_JAEGER_QUERY_ELAPSED
414        .with_label_values(&[&db, "/api/services"])
415        .start_timer();
416
417    match handler.get_services(query_ctx).await {
418        Ok(output) => match covert_to_records(output).await {
419            Ok(Some(records)) => match services_from_records(records) {
420                Ok(services) => {
421                    let services_num = services.len();
422                    (
423                        HttpStatusCode::OK,
424                        axum::Json(JaegerAPIResponse {
425                            data: Some(JaegerData::ServiceNames(services)),
426                            total: services_num,
427                            ..Default::default()
428                        }),
429                    )
430                }
431                Err(err) => {
432                    error!("Failed to get services: {:?}", err);
433                    error_response(err)
434                }
435            },
436            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
437            Err(err) => {
438                error!("Failed to get services: {:?}", err);
439                error_response(err)
440            }
441        },
442        Err(err) => handle_query_error(err, "Failed to get services", &db),
443    }
444}
445
446/// Handle the GET `/api/traces/{trace_id}` request.
447#[axum_macros::debug_handler]
448#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
449pub async fn handle_get_trace(
450    State(handler): State<JaegerQueryHandlerRef>,
451    Path(trace_id): Path<String>,
452    Query(query_params): Query<JaegerQueryParams>,
453    Extension(mut query_ctx): Extension<QueryContext>,
454    TraceTableName(table_name): TraceTableName,
455) -> impl IntoResponse {
456    debug!(
457        "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
458        trace_id, query_params, query_ctx
459    );
460
461    update_query_context(&mut query_ctx, table_name);
462    let query_ctx = Arc::new(query_ctx);
463    let db = query_ctx.get_db_string();
464
465    // Record the query time histogram.
466    let _timer = METRIC_JAEGER_QUERY_ELAPSED
467        .with_label_values(&[&db, "/api/traces"])
468        .start_timer();
469
470    // Convert start time and end time from microseconds to nanoseconds.
471    let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
472    let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
473
474    let output = match handler
475        .get_trace(
476            query_ctx,
477            &trace_id,
478            start_time_ns,
479            end_time_ns,
480            query_params.limit,
481        )
482        .await
483    {
484        Ok(output) => output,
485        Err(err) => {
486            return handle_query_error(
487                err,
488                &format!("Failed to get trace for '{}'", trace_id),
489                &db,
490            );
491        }
492    };
493
494    match covert_to_records(output).await {
495        Ok(Some(records)) => match traces_from_records(records) {
496            Ok(traces) if traces.is_empty() => (
497                HttpStatusCode::NOT_FOUND,
498                axum::Json(JaegerAPIResponse::trace_not_found()),
499            ),
500            Ok(traces) => (
501                HttpStatusCode::OK,
502                axum::Json(JaegerAPIResponse {
503                    data: Some(JaegerData::Traces(traces)),
504                    ..Default::default()
505                }),
506            ),
507            Err(err) => {
508                error!("Failed to get trace '{}': {:?}", trace_id, err);
509                error_response(err)
510            }
511        },
512        Ok(None) => (
513            HttpStatusCode::NOT_FOUND,
514            axum::Json(JaegerAPIResponse::trace_not_found()),
515        ),
516        Err(err) => {
517            error!("Failed to get trace '{}': {:?}", trace_id, err);
518            error_response(err)
519        }
520    }
521}
522
523/// Handle the GET `/api/traces` request.
524#[axum_macros::debug_handler]
525#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
526pub async fn handle_find_traces(
527    State(handler): State<JaegerQueryHandlerRef>,
528    Query(query_params): Query<JaegerQueryParams>,
529    Extension(mut query_ctx): Extension<QueryContext>,
530    TraceTableName(table_name): TraceTableName,
531    optional_user_agent: Option<TypedHeader<UserAgent>>,
532) -> impl IntoResponse {
533    debug!(
534        "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
535        query_params, query_ctx
536    );
537
538    update_query_context(&mut query_ctx, table_name);
539    let query_ctx = Arc::new(query_ctx);
540    let db = query_ctx.get_db_string();
541
542    // Record the query time histogram.
543    let _timer = METRIC_JAEGER_QUERY_ELAPSED
544        .with_label_values(&[&db, "/api/traces"])
545        .start_timer();
546
547    match QueryTraceParams::from_jaeger_query_params(query_params) {
548        Ok(mut query_params) => {
549            if let Some(TypedHeader(user_agent)) = optional_user_agent {
550                query_params.user_agent = user_agent.into();
551            }
552            let output = handler.find_traces(query_ctx, query_params).await;
553            match output {
554                Ok(output) => match covert_to_records(output).await {
555                    Ok(Some(records)) => match traces_from_records(records) {
556                        Ok(traces) => (
557                            HttpStatusCode::OK,
558                            axum::Json(JaegerAPIResponse {
559                                data: Some(JaegerData::Traces(traces)),
560                                ..Default::default()
561                            }),
562                        ),
563                        Err(err) => {
564                            error!("Failed to find traces: {:?}", err);
565                            error_response(err)
566                        }
567                    },
568                    Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
569                    Err(err) => error_response(err),
570                },
571                Err(err) => handle_query_error(err, "Failed to find traces", &db),
572            }
573        }
574        Err(e) => error_response(e),
575    }
576}
577
578/// Handle the GET `/api/operations` request.
579#[axum_macros::debug_handler]
580#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
581pub async fn handle_get_operations(
582    State(handler): State<JaegerQueryHandlerRef>,
583    Query(query_params): Query<JaegerQueryParams>,
584    Extension(mut query_ctx): Extension<QueryContext>,
585    TraceTableName(table_name): TraceTableName,
586    headers: HeaderMap,
587) -> impl IntoResponse {
588    debug!(
589        "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
590        query_params, query_ctx, headers
591    );
592
593    if let Some(service_name) = &query_params.service_name {
594        update_query_context(&mut query_ctx, table_name);
595        let query_ctx = Arc::new(query_ctx);
596        let db = query_ctx.get_db_string();
597
598        // Record the query time histogram.
599        let _timer = METRIC_JAEGER_QUERY_ELAPSED
600            .with_label_values(&[&db, "/api/operations"])
601            .start_timer();
602
603        match handler
604            .get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
605            .await
606        {
607            Ok(output) => match covert_to_records(output).await {
608                Ok(Some(records)) => match operations_from_records(records, true) {
609                    Ok(operations) => {
610                        let total = operations.len();
611                        (
612                            HttpStatusCode::OK,
613                            axum::Json(JaegerAPIResponse {
614                                data: Some(JaegerData::Operations(operations)),
615                                total,
616                                ..Default::default()
617                            }),
618                        )
619                    }
620                    Err(err) => {
621                        error!("Failed to get operations: {:?}", err);
622                        error_response(err)
623                    }
624                },
625                Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
626                Err(err) => error_response(err),
627            },
628            Err(err) => handle_query_error(
629                err,
630                &format!("Failed to get operations for service '{}'", service_name),
631                &db,
632            ),
633        }
634    } else {
635        (
636            HttpStatusCode::BAD_REQUEST,
637            axum::Json(JaegerAPIResponse {
638                errors: vec![JaegerAPIError {
639                    code: 400,
640                    msg: "parameter 'service' is required".to_string(),
641                    trace_id: None,
642                }],
643                ..Default::default()
644            }),
645        )
646    }
647}
648
649/// Handle the GET `/api/services/{service_name}/operations` request.
650#[axum_macros::debug_handler]
651#[tracing::instrument(
652    skip_all,
653    fields(protocol = "jaeger", request_type = "get_operations_by_service")
654)]
655pub async fn handle_get_operations_by_service(
656    State(handler): State<JaegerQueryHandlerRef>,
657    Path(service_name): Path<String>,
658    Query(query_params): Query<JaegerQueryParams>,
659    Extension(mut query_ctx): Extension<QueryContext>,
660    TraceTableName(table_name): TraceTableName,
661    headers: HeaderMap,
662) -> impl IntoResponse {
663    debug!(
664        "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
665        service_name, query_params, query_ctx, headers
666    );
667
668    update_query_context(&mut query_ctx, table_name);
669    let query_ctx = Arc::new(query_ctx);
670    let db = query_ctx.get_db_string();
671
672    // Record the query time histogram.
673    let _timer = METRIC_JAEGER_QUERY_ELAPSED
674        .with_label_values(&[&db, "/api/services"])
675        .start_timer();
676
677    match handler.get_operations(query_ctx, &service_name, None).await {
678        Ok(output) => match covert_to_records(output).await {
679            Ok(Some(records)) => match operations_from_records(records, false) {
680                Ok(operations) => {
681                    let operations: Vec<String> =
682                        operations.into_iter().map(|op| op.name).collect();
683                    let total = operations.len();
684                    (
685                        HttpStatusCode::OK,
686                        axum::Json(JaegerAPIResponse {
687                            data: Some(JaegerData::OperationsNames(operations)),
688                            total,
689                            ..Default::default()
690                        }),
691                    )
692                }
693                Err(err) => {
694                    error!(
695                        "Failed to get operations for service '{}': {:?}",
696                        service_name, err
697                    );
698                    error_response(err)
699                }
700            },
701            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
702            Err(err) => error_response(err),
703        },
704        Err(err) => handle_query_error(
705            err,
706            &format!("Failed to get operations for service '{}'", service_name),
707            &db,
708        ),
709    }
710}
711
712async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
713    match output.data {
714        OutputData::Stream(stream) => {
715            let records = HttpRecordsOutput::try_new(
716                stream.schema().clone(),
717                util::collect(stream)
718                    .await
719                    .context(CollectRecordbatchSnafu)?,
720            )?;
721            debug!(
722                "The query records: {}",
723                serde_json::to_string(&records).unwrap()
724            );
725            Ok(Some(records))
726        }
727        // It's unlikely to happen. However, if the output is not a stream, return None.
728        _ => Ok(None),
729    }
730}
731
732fn handle_query_error(
733    err: Error,
734    prompt: &str,
735    db: &str,
736) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
737    // To compatible with the Jaeger API, if the trace table is not found, return an empty response instead of an error.
738    if err.status_code() == StatusCode::TableNotFound {
739        warn!(
740            "No trace table '{}' found in database '{}'",
741            TRACE_TABLE_NAME, db
742        );
743        (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
744    } else {
745        error!("{}: {:?}", prompt, err);
746        error_response(err)
747    }
748}
749
750fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
751    (
752        status_code_to_http_status(&err.status_code()),
753        axum::Json(JaegerAPIResponse {
754            errors: vec![JaegerAPIError {
755                code: err.status_code() as i32,
756                msg: err.to_string(),
757                ..Default::default()
758            }],
759            ..Default::default()
760        }),
761    )
762}
763
764fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
765    // maintain the mapping: trace_id -> (process_id -> service_name).
766    let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
767    // maintain the mapping: trace_id -> spans.
768    // use BTreeMap to retain order
769    let mut trace_id_to_spans: BTreeMap<String, Vec<Span>> = BTreeMap::new();
770    // maintain the mapping: service.name -> resource.attributes.
771    let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
772
773    let is_span_attributes_flatten = !records
774        .schema
775        .column_schemas
776        .iter()
777        .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
778
779    for row in records.rows.into_iter() {
780        let mut span = Span::default();
781        let mut service_name = None;
782        let mut resource_tags = vec![];
783
784        for (idx, cell) in row.into_iter().enumerate() {
785            // safe to use index here
786            let column_name = &records.schema.column_schemas[idx].name;
787
788            match column_name.as_str() {
789                TRACE_ID_COLUMN => {
790                    if let JsonValue::String(trace_id) = cell {
791                        span.trace_id = trace_id.clone();
792                        trace_id_to_processes.entry(trace_id).or_default();
793                    }
794                }
795                TIMESTAMP_COLUMN => {
796                    span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
797                        reason: "Failed to convert timestamp to u64".to_string(),
798                    })? / 1000;
799                }
800                DURATION_NANO_COLUMN => {
801                    span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
802                        reason: "Failed to convert duration to u64".to_string(),
803                    })? / 1000;
804                }
805                SERVICE_NAME_COLUMN => {
806                    if let JsonValue::String(name) = cell {
807                        service_name = Some(name);
808                    }
809                }
810                SPAN_NAME_COLUMN => {
811                    if let JsonValue::String(span_name) = cell {
812                        span.operation_name = span_name;
813                    }
814                }
815                SPAN_ID_COLUMN => {
816                    if let JsonValue::String(span_id) = cell {
817                        span.span_id = span_id;
818                    }
819                }
820                SPAN_ATTRIBUTES_COLUMN => {
821                    // for v0 data model, span_attributes are nested as a json
822                    // data structure
823                    if let JsonValue::Object(span_attrs) = cell {
824                        span.tags.extend(object_to_tags(span_attrs));
825                    }
826                }
827                RESOURCE_ATTRIBUTES_COLUMN => {
828                    // for v0 data model, resource_attributes are nested as a json
829                    // data structure
830
831                    if let JsonValue::Object(mut resource_attrs) = cell {
832                        resource_attrs.remove(KEY_SERVICE_NAME);
833                        resource_tags = object_to_tags(resource_attrs);
834                    }
835                }
836                PARENT_SPAN_ID_COLUMN => {
837                    if let JsonValue::String(parent_span_id) = cell
838                        && !parent_span_id.is_empty()
839                    {
840                        span.references.push(Reference {
841                            trace_id: span.trace_id.clone(),
842                            span_id: parent_span_id,
843                            ref_type: REF_TYPE_CHILD_OF.to_string(),
844                        });
845                    }
846                }
847                SPAN_EVENTS_COLUMN => {
848                    if let JsonValue::Array(events) = cell {
849                        for event in events {
850                            if let JsonValue::Object(mut obj) = event {
851                                let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
852                                    continue;
853                                };
854
855                                let Some(t) =
856                                    obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
857                                        SPAN_KIND_TIME_FMTS
858                                            .iter()
859                                            .find_map(|fmt| {
860                                                chrono::DateTime::parse_from_str(s, fmt).ok()
861                                            })
862                                            .map(|dt| dt.timestamp_micros() as u64)
863                                    })
864                                else {
865                                    continue;
866                                };
867
868                                let mut fields = vec![KeyValue {
869                                    key: "event".to_string(),
870                                    value_type: ValueType::String,
871                                    value: Value::String(action.to_string()),
872                                }];
873
874                                // Add event attributes as fields
875                                if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
876                                    fields.extend(object_to_tags(attrs));
877                                }
878
879                                span.logs.push(Log {
880                                    timestamp: t,
881                                    fields,
882                                });
883                            }
884                        }
885                    }
886                }
887                SCOPE_NAME_COLUMN => {
888                    if let JsonValue::String(scope_name) = cell
889                        && !scope_name.is_empty()
890                    {
891                        span.tags.push(KeyValue {
892                            key: KEY_OTEL_SCOPE_NAME.to_string(),
893                            value_type: ValueType::String,
894                            value: Value::String(scope_name),
895                        });
896                    }
897                }
898                SCOPE_VERSION_COLUMN => {
899                    if let JsonValue::String(scope_version) = cell
900                        && !scope_version.is_empty()
901                    {
902                        span.tags.push(KeyValue {
903                            key: KEY_OTEL_SCOPE_VERSION.to_string(),
904                            value_type: ValueType::String,
905                            value: Value::String(scope_version),
906                        });
907                    }
908                }
909                SPAN_KIND_COLUMN => {
910                    if let JsonValue::String(span_kind) = cell
911                        && !span_kind.is_empty()
912                    {
913                        span.tags.push(KeyValue {
914                            key: KEY_SPAN_KIND.to_string(),
915                            value_type: ValueType::String,
916                            value: Value::String(normalize_span_kind(&span_kind)),
917                        });
918                    }
919                }
920                SPAN_STATUS_CODE => {
921                    if let JsonValue::String(span_status) = cell
922                        && span_status != SPAN_STATUS_UNSET
923                        && !span_status.is_empty()
924                    {
925                        span.tags.push(KeyValue {
926                            key: KEY_OTEL_STATUS_CODE.to_string(),
927                            value_type: ValueType::String,
928                            value: Value::String(normalize_status_code(&span_status)),
929                        });
930                        // set error to comply with the Jaeger API
931                        if span_status == SPAN_STATUS_ERROR {
932                            span.tags.push(KeyValue {
933                                key: KEY_OTEL_STATUS_ERROR_KEY.to_string(),
934                                value_type: ValueType::Boolean,
935                                value: Value::Boolean(true),
936                            });
937                        }
938                    }
939                }
940
941                SPAN_STATUS_MESSAGE_COLUMN => {
942                    if let JsonValue::String(span_status_message) = cell
943                        && !span_status_message.is_empty()
944                    {
945                        span.tags.push(KeyValue {
946                            key: KEY_OTEL_STATUS_MESSAGE.to_string(),
947                            value_type: ValueType::String,
948                            value: Value::String(span_status_message),
949                        });
950                    }
951                }
952
953                TRACE_STATE_COLUMN => {
954                    if let JsonValue::String(trace_state) = cell
955                        && !trace_state.is_empty()
956                    {
957                        span.tags.push(KeyValue {
958                            key: KEY_OTEL_TRACE_STATE.to_string(),
959                            value_type: ValueType::String,
960                            value: Value::String(trace_state),
961                        });
962                    }
963                }
964
965                _ => {
966                    // this this v1 data model
967                    if is_span_attributes_flatten {
968                        const SPAN_ATTR_PREFIX: &str = "span_attributes.";
969                        const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
970                        // a span attributes column
971                        if column_name.starts_with(SPAN_ATTR_PREFIX) {
972                            if let Some(keyvalue) = to_keyvalue(
973                                column_name
974                                    .strip_prefix(SPAN_ATTR_PREFIX)
975                                    .unwrap_or_default()
976                                    .to_string(),
977                                cell,
978                            ) {
979                                span.tags.push(keyvalue);
980                            }
981                        } else if column_name.starts_with(RESOURCE_ATTR_PREFIX)
982                            && let Some(keyvalue) = to_keyvalue(
983                                column_name
984                                    .strip_prefix(RESOURCE_ATTR_PREFIX)
985                                    .unwrap_or_default()
986                                    .to_string(),
987                                cell,
988                            )
989                        {
990                            resource_tags.push(keyvalue);
991                        }
992                    }
993                }
994            }
995        }
996
997        if let Some(service_name) = service_name {
998            if !service_to_resource_attributes.contains_key(&service_name) {
999                service_to_resource_attributes.insert(service_name.clone(), resource_tags);
1000            }
1001
1002            if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
1003                if let Some(process_id) = process.get(&service_name) {
1004                    span.process_id = process_id.clone();
1005                } else {
1006                    // Allocate a new process id.
1007                    let process_id = format!("p{}", process.len() + 1);
1008                    process.insert(service_name, process_id.clone());
1009                    span.process_id = process_id;
1010                }
1011            }
1012        }
1013
1014        // ensure span tags order
1015        span.tags.sort_by(|a, b| a.key.cmp(&b.key));
1016
1017        if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
1018            spans.push(span);
1019        } else {
1020            trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
1021        }
1022    }
1023
1024    let mut traces = Vec::new();
1025    for (trace_id, spans) in trace_id_to_spans {
1026        let mut trace = Trace {
1027            trace_id,
1028            spans,
1029            ..Default::default()
1030        };
1031
1032        if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
1033            let mut process_id_to_process = HashMap::new();
1034            for (service_name, process_id) in processes.into_iter() {
1035                let tags = service_to_resource_attributes
1036                    .remove(&service_name)
1037                    .unwrap_or_default();
1038                process_id_to_process.insert(process_id, Process { service_name, tags });
1039            }
1040            trace.processes = process_id_to_process;
1041        }
1042        traces.push(trace);
1043    }
1044
1045    Ok(traces)
1046}
1047
1048fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
1049    match value {
1050        JsonValue::String(value) => Some(KeyValue {
1051            key,
1052            value_type: ValueType::String,
1053            value: Value::String(value.clone()),
1054        }),
1055        JsonValue::Number(value) => Some(KeyValue {
1056            key,
1057            value_type: ValueType::Int64,
1058            value: Value::Int64(value.as_i64().unwrap_or(0)),
1059        }),
1060        JsonValue::Bool(value) => Some(KeyValue {
1061            key,
1062            value_type: ValueType::Boolean,
1063            value: Value::Boolean(value),
1064        }),
1065        JsonValue::Array(value) => Some(KeyValue {
1066            key,
1067            value_type: ValueType::String,
1068            value: Value::String(serde_json::to_string(&value).unwrap()),
1069        }),
1070        JsonValue::Object(value) => Some(KeyValue {
1071            key,
1072            value_type: ValueType::String,
1073            value: Value::String(serde_json::to_string(&value).unwrap()),
1074        }),
1075        JsonValue::Null => None,
1076    }
1077}
1078
1079fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1080    object
1081        .into_iter()
1082        .filter_map(|(key, value)| to_keyvalue(key, value))
1083        .collect()
1084}
1085
1086fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1087    let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1088    check_schema(&records, &expected_schema)?;
1089
1090    let mut services = Vec::with_capacity(records.total_rows);
1091    for row in records.rows.into_iter() {
1092        for value in row.into_iter() {
1093            if let JsonValue::String(service_name) = value {
1094                services.push(service_name);
1095            }
1096        }
1097    }
1098    Ok(services)
1099}
1100
1101// Construct Jaeger operations from records.
1102fn operations_from_records(
1103    records: HttpRecordsOutput,
1104    contain_span_kind: bool,
1105) -> Result<Vec<Operation>> {
1106    let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1107    check_schema(&records, &expected_schema)?;
1108
1109    let mut operations = Vec::with_capacity(records.total_rows);
1110    for row in records.rows.into_iter() {
1111        let mut row_iter = row.into_iter();
1112        if let Some(JsonValue::String(operation)) = row_iter.next() {
1113            let mut operation = Operation {
1114                name: operation,
1115                span_kind: None,
1116            };
1117            if contain_span_kind {
1118                if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1119                    operation.span_kind = Some(normalize_span_kind(&span_kind));
1120                }
1121            } else {
1122                // skip span kind.
1123                row_iter.next();
1124            }
1125            operations.push(operation);
1126        }
1127    }
1128
1129    Ok(operations)
1130}
1131
1132// Check whether the schema of the records is correct.
1133fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1134    for (i, column) in records.schema.column_schemas.iter().enumerate() {
1135        if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1136            InvalidJaegerQuerySnafu {
1137                reason: "query result schema is not correct".to_string(),
1138            }
1139            .fail()?
1140        }
1141    }
1142    Ok(())
1143}
1144
1145// By default, the span kind is stored as `SPAN_KIND_<kind>` in GreptimeDB.
1146// However, in Jaeger API, the span kind is returned as `<kind>` which is the lowercase of the span kind and without the `SPAN_KIND_` prefix.
1147fn normalize_span_kind(span_kind: &str) -> String {
1148    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1149    if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1150        stripped.to_lowercase()
1151    } else {
1152        // It's unlikely to happen. However, we still convert it to lowercase for consistency.
1153        span_kind.to_lowercase()
1154    }
1155}
1156
1157// By default, the status code is stored as `STATUS_CODE_<code>` in GreptimeDB.
1158// However, in Jaeger API, the status code is returned as `<code>` without the `STATUS_CODE_` prefix.
1159fn normalize_status_code(status_code: &str) -> String {
1160    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1161    if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1162        stripped.to_string()
1163    } else {
1164        // It's unlikely to happen
1165        status_code.to_string()
1166    }
1167}
1168
1169fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1170    if let Some(data) = input.as_str() {
1171        if let Ok(number) = data.parse::<i64>() {
1172            return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1173        }
1174        if let Ok(number) = data.parse::<f64>()
1175            && let Some(number) = serde_json::Number::from_f64(number)
1176        {
1177            return Some(serde_json::Value::Number(number));
1178        }
1179    }
1180
1181    None
1182}
1183
1184fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1185    if let Some(data) = input.as_str() {
1186        if data == "true" {
1187            return Some(serde_json::Value::Bool(true));
1188        }
1189        if data == "false" {
1190            return Some(serde_json::Value::Bool(false));
1191        }
1192    }
1193
1194    None
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199    use serde_json::{Number, Value as JsonValue, json};
1200
1201    use super::*;
1202    use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1203
1204    #[test]
1205    fn test_services_from_records() {
1206        // The tests is the tuple of `(test_records, expected)`.
1207        let tests = vec![(
1208            HttpRecordsOutput {
1209                schema: OutputSchema {
1210                    column_schemas: vec![ColumnSchema {
1211                        name: "service_name".to_string(),
1212                        data_type: "String".to_string(),
1213                    }],
1214                },
1215                rows: vec![
1216                    vec![JsonValue::String("test-service-0".to_string())],
1217                    vec![JsonValue::String("test-service-1".to_string())],
1218                ],
1219                total_rows: 2,
1220                metrics: HashMap::new(),
1221            },
1222            vec!["test-service-0".to_string(), "test-service-1".to_string()],
1223        )];
1224
1225        for (records, expected) in tests {
1226            let services = services_from_records(records).unwrap();
1227            assert_eq!(services, expected);
1228        }
1229    }
1230
1231    #[test]
1232    fn test_operations_from_records() {
1233        // The tests is the tuple of `(test_records, contain_span_kind, expected)`.
1234        let tests = vec![
1235            (
1236                HttpRecordsOutput {
1237                    schema: OutputSchema {
1238                        column_schemas: vec![
1239                            ColumnSchema {
1240                                name: "span_name".to_string(),
1241                                data_type: "String".to_string(),
1242                            },
1243                            ColumnSchema {
1244                                name: "span_kind".to_string(),
1245                                data_type: "String".to_string(),
1246                            },
1247                        ],
1248                    },
1249                    rows: vec![
1250                        vec![
1251                            JsonValue::String("access-mysql".to_string()),
1252                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1253                        ],
1254                        vec![
1255                            JsonValue::String("access-redis".to_string()),
1256                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1257                        ],
1258                    ],
1259                    total_rows: 2,
1260                    metrics: HashMap::new(),
1261                },
1262                false,
1263                vec![
1264                    Operation {
1265                        name: "access-mysql".to_string(),
1266                        span_kind: None,
1267                    },
1268                    Operation {
1269                        name: "access-redis".to_string(),
1270                        span_kind: None,
1271                    },
1272                ],
1273            ),
1274            (
1275                HttpRecordsOutput {
1276                    schema: OutputSchema {
1277                        column_schemas: vec![
1278                            ColumnSchema {
1279                                name: "span_name".to_string(),
1280                                data_type: "String".to_string(),
1281                            },
1282                            ColumnSchema {
1283                                name: "span_kind".to_string(),
1284                                data_type: "String".to_string(),
1285                            },
1286                        ],
1287                    },
1288                    rows: vec![
1289                        vec![
1290                            JsonValue::String("access-mysql".to_string()),
1291                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1292                        ],
1293                        vec![
1294                            JsonValue::String("access-redis".to_string()),
1295                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1296                        ],
1297                    ],
1298                    total_rows: 2,
1299                    metrics: HashMap::new(),
1300                },
1301                true,
1302                vec![
1303                    Operation {
1304                        name: "access-mysql".to_string(),
1305                        span_kind: Some("server".to_string()),
1306                    },
1307                    Operation {
1308                        name: "access-redis".to_string(),
1309                        span_kind: Some("client".to_string()),
1310                    },
1311                ],
1312            ),
1313        ];
1314
1315        for (records, contain_span_kind, expected) in tests {
1316            let operations = operations_from_records(records, contain_span_kind).unwrap();
1317            assert_eq!(operations, expected);
1318        }
1319    }
1320
1321    #[test]
1322    fn test_traces_from_records() {
1323        // The tests is the tuple of `(test_records, expected)`.
1324        let tests = vec![(
1325            HttpRecordsOutput {
1326                schema: OutputSchema {
1327                    column_schemas: vec![
1328                        ColumnSchema {
1329                            name: "trace_id".to_string(),
1330                            data_type: "String".to_string(),
1331                        },
1332                        ColumnSchema {
1333                            name: "timestamp".to_string(),
1334                            data_type: "TimestampNanosecond".to_string(),
1335                        },
1336                        ColumnSchema {
1337                            name: "duration_nano".to_string(),
1338                            data_type: "UInt64".to_string(),
1339                        },
1340                        ColumnSchema {
1341                            name: "service_name".to_string(),
1342                            data_type: "String".to_string(),
1343                        },
1344                        ColumnSchema {
1345                            name: "span_name".to_string(),
1346                            data_type: "String".to_string(),
1347                        },
1348                        ColumnSchema {
1349                            name: "span_id".to_string(),
1350                            data_type: "String".to_string(),
1351                        },
1352                        ColumnSchema {
1353                            name: "span_attributes".to_string(),
1354                            data_type: "Json".to_string(),
1355                        },
1356                    ],
1357                },
1358                rows: vec![
1359                    vec![
1360                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1361                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1362                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1363                        JsonValue::String("test-service-0".to_string()),
1364                        JsonValue::String("access-mysql".to_string()),
1365                        JsonValue::String("008421dbbd33a3e9".to_string()),
1366                        JsonValue::Object(
1367                            json!({
1368                                "operation.type": "access-mysql",
1369                            })
1370                            .as_object()
1371                            .unwrap()
1372                            .clone(),
1373                        ),
1374                    ],
1375                    vec![
1376                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1377                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1378                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1379                        JsonValue::String("test-service-0".to_string()),
1380                        JsonValue::String("access-redis".to_string()),
1381                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1382                        JsonValue::Object(
1383                            json!({
1384                                "operation.type": "access-redis",
1385                            })
1386                            .as_object()
1387                            .unwrap()
1388                            .clone(),
1389                        ),
1390                    ],
1391                ],
1392                total_rows: 2,
1393                metrics: HashMap::new(),
1394            },
1395            vec![Trace {
1396                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1397                spans: vec![
1398                    Span {
1399                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1400                        span_id: "008421dbbd33a3e9".to_string(),
1401                        operation_name: "access-mysql".to_string(),
1402                        start_time: 1738726754492422,
1403                        duration: 100000,
1404                        tags: vec![KeyValue {
1405                            key: "operation.type".to_string(),
1406                            value_type: ValueType::String,
1407                            value: Value::String("access-mysql".to_string()),
1408                        }],
1409                        process_id: "p1".to_string(),
1410                        ..Default::default()
1411                    },
1412                    Span {
1413                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1414                        span_id: "ffa03416a7b9ea48".to_string(),
1415                        operation_name: "access-redis".to_string(),
1416                        start_time: 1738726754642422,
1417                        duration: 100000,
1418                        tags: vec![KeyValue {
1419                            key: "operation.type".to_string(),
1420                            value_type: ValueType::String,
1421                            value: Value::String("access-redis".to_string()),
1422                        }],
1423                        process_id: "p1".to_string(),
1424                        ..Default::default()
1425                    },
1426                ],
1427                processes: HashMap::from([(
1428                    "p1".to_string(),
1429                    Process {
1430                        service_name: "test-service-0".to_string(),
1431                        tags: vec![],
1432                    },
1433                )]),
1434                ..Default::default()
1435            }],
1436        )];
1437
1438        for (records, expected) in tests {
1439            let traces = traces_from_records(records).unwrap();
1440            assert_eq!(traces, expected);
1441        }
1442    }
1443
1444    #[test]
1445    fn test_traces_from_v1_records() {
1446        // The tests is the tuple of `(test_records, expected)`.
1447        let tests = vec![(
1448            HttpRecordsOutput {
1449                schema: OutputSchema {
1450                    column_schemas: vec![
1451                        ColumnSchema {
1452                            name: "trace_id".to_string(),
1453                            data_type: "String".to_string(),
1454                        },
1455                        ColumnSchema {
1456                            name: "timestamp".to_string(),
1457                            data_type: "TimestampNanosecond".to_string(),
1458                        },
1459                        ColumnSchema {
1460                            name: "duration_nano".to_string(),
1461                            data_type: "UInt64".to_string(),
1462                        },
1463                        ColumnSchema {
1464                            name: "service_name".to_string(),
1465                            data_type: "String".to_string(),
1466                        },
1467                        ColumnSchema {
1468                            name: "span_name".to_string(),
1469                            data_type: "String".to_string(),
1470                        },
1471                        ColumnSchema {
1472                            name: "span_id".to_string(),
1473                            data_type: "String".to_string(),
1474                        },
1475                        ColumnSchema {
1476                            name: "span_attributes.http.request.method".to_string(),
1477                            data_type: "String".to_string(),
1478                        },
1479                        ColumnSchema {
1480                            name: "span_attributes.http.request.url".to_string(),
1481                            data_type: "String".to_string(),
1482                        },
1483                        ColumnSchema {
1484                            name: "span_attributes.http.status_code".to_string(),
1485                            data_type: "UInt64".to_string(),
1486                        },
1487                    ],
1488                },
1489                rows: vec![
1490                    vec![
1491                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1492                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1493                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1494                        JsonValue::String("test-service-0".to_string()),
1495                        JsonValue::String("access-mysql".to_string()),
1496                        JsonValue::String("008421dbbd33a3e9".to_string()),
1497                        JsonValue::String("GET".to_string()),
1498                        JsonValue::String("/data".to_string()),
1499                        JsonValue::Number(Number::from_u128(200).unwrap()),
1500                    ],
1501                    vec![
1502                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1503                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1504                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1505                        JsonValue::String("test-service-0".to_string()),
1506                        JsonValue::String("access-redis".to_string()),
1507                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1508                        JsonValue::String("POST".to_string()),
1509                        JsonValue::String("/create".to_string()),
1510                        JsonValue::Number(Number::from_u128(400).unwrap()),
1511                    ],
1512                ],
1513                total_rows: 2,
1514                metrics: HashMap::new(),
1515            },
1516            vec![Trace {
1517                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1518                spans: vec![
1519                    Span {
1520                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1521                        span_id: "008421dbbd33a3e9".to_string(),
1522                        operation_name: "access-mysql".to_string(),
1523                        start_time: 1738726754492422,
1524                        duration: 100000,
1525                        tags: vec![
1526                            KeyValue {
1527                                key: "http.request.method".to_string(),
1528                                value_type: ValueType::String,
1529                                value: Value::String("GET".to_string()),
1530                            },
1531                            KeyValue {
1532                                key: "http.request.url".to_string(),
1533                                value_type: ValueType::String,
1534                                value: Value::String("/data".to_string()),
1535                            },
1536                            KeyValue {
1537                                key: "http.status_code".to_string(),
1538                                value_type: ValueType::Int64,
1539                                value: Value::Int64(200),
1540                            },
1541                        ],
1542                        process_id: "p1".to_string(),
1543                        ..Default::default()
1544                    },
1545                    Span {
1546                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1547                        span_id: "ffa03416a7b9ea48".to_string(),
1548                        operation_name: "access-redis".to_string(),
1549                        start_time: 1738726754642422,
1550                        duration: 100000,
1551                        tags: vec![
1552                            KeyValue {
1553                                key: "http.request.method".to_string(),
1554                                value_type: ValueType::String,
1555                                value: Value::String("POST".to_string()),
1556                            },
1557                            KeyValue {
1558                                key: "http.request.url".to_string(),
1559                                value_type: ValueType::String,
1560                                value: Value::String("/create".to_string()),
1561                            },
1562                            KeyValue {
1563                                key: "http.status_code".to_string(),
1564                                value_type: ValueType::Int64,
1565                                value: Value::Int64(400),
1566                            },
1567                        ],
1568                        process_id: "p1".to_string(),
1569                        ..Default::default()
1570                    },
1571                ],
1572                processes: HashMap::from([(
1573                    "p1".to_string(),
1574                    Process {
1575                        service_name: "test-service-0".to_string(),
1576                        tags: vec![],
1577                    },
1578                )]),
1579                ..Default::default()
1580            }],
1581        )];
1582
1583        for (records, expected) in tests {
1584            let traces = traces_from_records(records).unwrap();
1585            assert_eq!(traces, expected);
1586        }
1587    }
1588
1589    #[test]
1590    fn test_from_jaeger_query_params() {
1591        // The tests is the tuple of `(test_query_params, expected)`.
1592        let tests = vec![
1593            (
1594                JaegerQueryParams {
1595                    service_name: Some("test-service-0".to_string()),
1596                    ..Default::default()
1597                },
1598                QueryTraceParams {
1599                    service_name: "test-service-0".to_string(),
1600                    ..Default::default()
1601                },
1602            ),
1603            (
1604                JaegerQueryParams {
1605                    service_name: Some("test-service-0".to_string()),
1606                    operation_name: Some("access-mysql".to_string()),
1607                    start: Some(1738726754492422),
1608                    end: Some(1738726754642422),
1609                    max_duration: Some("100ms".to_string()),
1610                    min_duration: Some("50ms".to_string()),
1611                    limit: Some(10),
1612                    tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1613                    ..Default::default()
1614                },
1615                QueryTraceParams {
1616                    service_name: "test-service-0".to_string(),
1617                    operation_name: Some("access-mysql".to_string()),
1618                    start_time: Some(1738726754492422000),
1619                    end_time: Some(1738726754642422000),
1620                    min_duration: Some(50000000),
1621                    max_duration: Some(100000000),
1622                    limit: Some(10),
1623                    tags: Some(HashMap::from([
1624                        ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1625                        ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1626                        ("error".to_string(), JsonValue::Bool(false)),
1627                        ("http.method".to_string(), JsonValue::String("GET".to_string())),
1628                        ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1629                    ])),
1630                    user_agent: TraceUserAgent::Jaeger,
1631                },
1632            ),
1633        ];
1634
1635        for (query_params, expected) in tests {
1636            let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1637            assert_eq!(query_params, expected);
1638        }
1639    }
1640
1641    #[test]
1642    fn test_check_schema() {
1643        // The tests is the tuple of `(test_records, expected_schema, is_ok)`.
1644        let tests = vec![(
1645            HttpRecordsOutput {
1646                schema: OutputSchema {
1647                    column_schemas: vec![
1648                        ColumnSchema {
1649                            name: "trace_id".to_string(),
1650                            data_type: "String".to_string(),
1651                        },
1652                        ColumnSchema {
1653                            name: "timestamp".to_string(),
1654                            data_type: "TimestampNanosecond".to_string(),
1655                        },
1656                        ColumnSchema {
1657                            name: "duration_nano".to_string(),
1658                            data_type: "UInt64".to_string(),
1659                        },
1660                        ColumnSchema {
1661                            name: "service_name".to_string(),
1662                            data_type: "String".to_string(),
1663                        },
1664                        ColumnSchema {
1665                            name: "span_name".to_string(),
1666                            data_type: "String".to_string(),
1667                        },
1668                        ColumnSchema {
1669                            name: "span_id".to_string(),
1670                            data_type: "String".to_string(),
1671                        },
1672                        ColumnSchema {
1673                            name: "span_attributes".to_string(),
1674                            data_type: "Json".to_string(),
1675                        },
1676                    ],
1677                },
1678                rows: vec![],
1679                total_rows: 0,
1680                metrics: HashMap::new(),
1681            },
1682            vec![
1683                (TRACE_ID_COLUMN, "String"),
1684                (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1685                (DURATION_NANO_COLUMN, "UInt64"),
1686                (SERVICE_NAME_COLUMN, "String"),
1687                (SPAN_NAME_COLUMN, "String"),
1688                (SPAN_ID_COLUMN, "String"),
1689                (SPAN_ATTRIBUTES_COLUMN, "Json"),
1690            ],
1691            true,
1692        )];
1693
1694        for (records, expected_schema, is_ok) in tests {
1695            let result = check_schema(&records, &expected_schema);
1696            assert_eq!(result.is_ok(), is_ok);
1697        }
1698    }
1699
1700    #[test]
1701    fn test_normalize_span_kind() {
1702        let tests = vec![
1703            ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1704            ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1705        ];
1706
1707        for (input, expected) in tests {
1708            let result = normalize_span_kind(&input);
1709            assert_eq!(result, expected);
1710        }
1711    }
1712
1713    #[test]
1714    fn test_convert_string_to_number() {
1715        let tests = vec![
1716            (
1717                JsonValue::String("123".to_string()),
1718                Some(JsonValue::Number(Number::from(123))),
1719            ),
1720            (
1721                JsonValue::String("123.456".to_string()),
1722                Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1723            ),
1724        ];
1725
1726        for (input, expected) in tests {
1727            let result = convert_string_to_number(&input);
1728            assert_eq!(result, expected);
1729        }
1730    }
1731
1732    #[test]
1733    fn test_convert_string_to_boolean() {
1734        let tests = vec![
1735            (
1736                JsonValue::String("true".to_string()),
1737                Some(JsonValue::Bool(true)),
1738            ),
1739            (
1740                JsonValue::String("false".to_string()),
1741                Some(JsonValue::Bool(false)),
1742            ),
1743        ];
1744
1745        for (input, expected) in tests {
1746            let result = convert_string_to_boolean(&input);
1747            assert_eq!(result, expected);
1748        }
1749    }
1750}