servers/http/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::extract::{Path, Query, State};
22use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
23use axum::response::IntoResponse;
24use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::util;
29use common_telemetry::{debug, error, tracing, warn};
30use serde::{Deserialize, Deserializer, Serialize, de};
31use serde_json::Value as JsonValue;
32use session::context::{Channel, QueryContext};
33use snafu::{OptionExt, ResultExt};
34
35use crate::error::{
36    CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result, status_code_to_http_status,
37};
38use crate::http::HttpRecordsOutput;
39use crate::http::extractor::TraceTableName;
40use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
41use crate::otlp::trace::{
42    DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
43    KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME,
44    KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN,
45    SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
46    SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
47    SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN,
48    TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
49};
50use crate::query_handler::JaegerQueryHandlerRef;
51
52pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
53
54const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
55const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
56
57/// JaegerAPIResponse is the response of Jaeger HTTP API.
58/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
59#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
60pub struct JaegerAPIResponse {
61    pub data: Option<JaegerData>,
62    pub total: usize,
63    pub limit: usize,
64    pub offset: usize,
65    pub errors: Vec<JaegerAPIError>,
66}
67
68/// JaegerData is the query result of Jaeger HTTP API.
69#[derive(Debug, Serialize, Deserialize, PartialEq)]
70#[serde(untagged)]
71pub enum JaegerData {
72    ServiceNames(Vec<String>),
73    OperationsNames(Vec<String>),
74    Operations(Vec<Operation>),
75    Traces(Vec<Trace>),
76}
77
78/// JaegerAPIError is the error of Jaeger HTTP API.
79#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
80#[serde(rename_all = "camelCase")]
81pub struct JaegerAPIError {
82    pub code: i32,
83    pub msg: String,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub trace_id: Option<String>,
86}
87
88/// Operation is an operation in a service.
89#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
90#[serde(rename_all = "camelCase")]
91pub struct Operation {
92    pub name: String,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub span_kind: Option<String>,
95}
96
97/// Trace is a collection of spans.
98#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
99#[serde(rename_all = "camelCase")]
100pub struct Trace {
101    #[serde(rename = "traceID")]
102    pub trace_id: String,
103    pub spans: Vec<Span>,
104
105    #[serde(skip_serializing_if = "HashMap::is_empty")]
106    pub processes: HashMap<String, Process>,
107
108    #[serde(skip_serializing_if = "Vec::is_empty")]
109    pub warnings: Vec<String>,
110}
111
112/// Span is a single operation within a trace.
113#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct Span {
116    #[serde(rename = "traceID")]
117    pub trace_id: String,
118
119    #[serde(rename = "spanID")]
120    pub span_id: String,
121
122    #[serde(rename = "parentSpanID")]
123    #[serde(skip_serializing_if = "String::is_empty")]
124    pub parent_span_id: String,
125
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub flags: Option<u32>,
128
129    pub operation_name: String,
130    pub references: Vec<Reference>,
131    pub start_time: u64, // microseconds since unix epoch
132    pub duration: u64,   // microseconds
133    pub tags: Vec<KeyValue>,
134    pub logs: Vec<Log>,
135
136    #[serde(rename = "processID")]
137    #[serde(skip_serializing_if = "String::is_empty")]
138    pub process_id: String,
139
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub process: Option<Process>,
142
143    #[serde(skip_serializing_if = "Vec::is_empty")]
144    pub warnings: Vec<String>,
145}
146
147/// Reference is a reference from one span to another.
148#[derive(Debug, Serialize, Deserialize, PartialEq)]
149#[serde(rename_all = "camelCase")]
150pub struct Reference {
151    #[serde(rename = "traceID")]
152    pub trace_id: String,
153    #[serde(rename = "spanID")]
154    pub span_id: String,
155    pub ref_type: String,
156}
157
158/// Process is the process emitting a set of spans.
159#[derive(Debug, Serialize, Deserialize, PartialEq)]
160#[serde(rename_all = "camelCase")]
161pub struct Process {
162    pub service_name: String,
163    pub tags: Vec<KeyValue>,
164}
165
166/// Log is a log emitted in a span.
167#[derive(Debug, Serialize, Deserialize, PartialEq)]
168#[serde(rename_all = "camelCase")]
169pub struct Log {
170    pub timestamp: u64,
171    pub fields: Vec<KeyValue>,
172}
173
174/// KeyValue is a key-value pair with typed value.
175#[derive(Debug, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub struct KeyValue {
178    pub key: String,
179    #[serde(rename = "type")]
180    pub value_type: ValueType,
181    pub value: Value,
182}
183
184/// Value is the value of a key-value pair in Jaeger Span attributes.
185#[derive(Debug, Serialize, Deserialize, PartialEq)]
186#[serde(untagged)]
187#[serde(rename_all = "camelCase")]
188pub enum Value {
189    String(String),
190    Int64(i64),
191    Float64(f64),
192    Boolean(bool),
193    Binary(Vec<u8>),
194}
195
196/// ValueType is the type of a value stored in KeyValue struct.
197#[derive(Debug, Serialize, Deserialize, PartialEq)]
198#[serde(rename_all = "lowercase")]
199pub enum ValueType {
200    String,
201    Int64,
202    Float64,
203    Boolean,
204    Binary,
205}
206
207/// JaegerQueryParams is the query parameters of Jaeger HTTP API.
208#[derive(Default, Debug, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct JaegerQueryParams {
211    /// Service name of the trace.
212    #[serde(rename = "service")]
213    pub service_name: Option<String>,
214
215    /// Operation name of the trace.
216    #[serde(rename = "operation")]
217    pub operation_name: Option<String>,
218
219    /// Limit the return data.
220    #[serde(default, deserialize_with = "empty_string_as_none")]
221    pub limit: Option<usize>,
222
223    /// Start time of the trace in microseconds since unix epoch.
224    pub start: Option<i64>,
225
226    /// End time of the trace in microseconds since unix epoch.
227    pub end: Option<i64>,
228
229    /// Max duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
230    #[serde(default, deserialize_with = "empty_string_as_none")]
231    pub max_duration: Option<String>,
232
233    /// Min duration string value of the trace. Units can be `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
234    #[serde(default, deserialize_with = "empty_string_as_none")]
235    pub min_duration: Option<String>,
236
237    /// Tags of the trace in JSON format. It will be URL encoded in the raw query.
238    /// The decoded format is like: tags="{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".
239    /// The key and value of the map are both strings. The key and value is the attribute name and value of the span. The value will be converted to the corresponding type when querying.
240    pub tags: Option<String>,
241
242    /// The span kind of the trace.
243    pub span_kind: Option<String>,
244}
245
246/// Serde deserialization decorator to map empty Strings to None.
247fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
248where
249    D: Deserializer<'de>,
250    T: FromStr,
251    T::Err: fmt::Display,
252{
253    let opt = Option::<String>::deserialize(de)?;
254    match opt.as_deref() {
255        None | Some("") => Ok(None),
256        Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
257    }
258}
259
260fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
261    // db should be already handled by middlewares
262    query_ctx.set_channel(Channel::Jaeger);
263    if let Some(table) = table_name {
264        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
265    }
266}
267
268impl QueryTraceParams {
269    fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
270        let mut internal_query_params: QueryTraceParams = QueryTraceParams {
271            service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
272                reason: "service_name is required".to_string(),
273            })?,
274            operation_name: query_params.operation_name,
275            // Convert start time from microseconds to nanoseconds.
276            start_time: query_params.start.map(|start| start * 1000),
277            end_time: query_params.end.map(|end| end * 1000),
278            ..Default::default()
279        };
280
281        if let Some(max_duration) = query_params.max_duration {
282            let duration = humantime::parse_duration(&max_duration).map_err(|e| {
283                InvalidJaegerQuerySnafu {
284                    reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
285                }
286                .build()
287            })?;
288            internal_query_params.max_duration = Some(duration.as_nanos() as u64);
289        }
290
291        if let Some(min_duration) = query_params.min_duration {
292            let duration = humantime::parse_duration(&min_duration).map_err(|e| {
293                InvalidJaegerQuerySnafu {
294                    reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
295                }
296                .build()
297            })?;
298            internal_query_params.min_duration = Some(duration.as_nanos() as u64);
299        }
300
301        if let Some(tags) = query_params.tags {
302            // Serialize the tags to a JSON map.
303            let mut tags_map: HashMap<String, JsonValue> =
304                serde_json::from_str(&tags).map_err(|e| {
305                    InvalidJaegerQuerySnafu {
306                        reason: format!("parse tags '{}' failed: {}", tags, e),
307                    }
308                    .build()
309                })?;
310            for (_, v) in tags_map.iter_mut() {
311                if let Some(number) = convert_string_to_number(v) {
312                    *v = number;
313                }
314                if let Some(boolean) = convert_string_to_boolean(v) {
315                    *v = boolean;
316                }
317            }
318            internal_query_params.tags = Some(tags_map);
319        }
320
321        internal_query_params.limit = query_params.limit;
322
323        Ok(internal_query_params)
324    }
325}
326
327#[derive(Debug, Default, PartialEq)]
328pub struct QueryTraceParams {
329    pub service_name: String,
330    pub operation_name: Option<String>,
331
332    // The limit of the number of traces to return.
333    pub limit: Option<usize>,
334
335    // Select the traces with the given tags(span attributes).
336    pub tags: Option<HashMap<String, JsonValue>>,
337
338    // The unit of the following time related parameters is nanoseconds.
339    pub start_time: Option<i64>,
340    pub end_time: Option<i64>,
341    pub min_duration: Option<u64>,
342    pub max_duration: Option<u64>,
343}
344
345/// Handle the GET `/api/services` request.
346#[axum_macros::debug_handler]
347#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
348pub async fn handle_get_services(
349    State(handler): State<JaegerQueryHandlerRef>,
350    Query(query_params): Query<JaegerQueryParams>,
351    Extension(mut query_ctx): Extension<QueryContext>,
352    TraceTableName(table_name): TraceTableName,
353) -> impl IntoResponse {
354    debug!(
355        "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
356        query_params, query_ctx
357    );
358
359    query_ctx.set_channel(Channel::Jaeger);
360    if let Some(table) = table_name {
361        query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
362    }
363
364    let query_ctx = Arc::new(query_ctx);
365    let db = query_ctx.get_db_string();
366
367    // Record the query time histogram.
368    let _timer = METRIC_JAEGER_QUERY_ELAPSED
369        .with_label_values(&[&db, "/api/services"])
370        .start_timer();
371
372    match handler.get_services(query_ctx).await {
373        Ok(output) => match covert_to_records(output).await {
374            Ok(Some(records)) => match services_from_records(records) {
375                Ok(services) => {
376                    let services_num = services.len();
377                    (
378                        HttpStatusCode::OK,
379                        axum::Json(JaegerAPIResponse {
380                            data: Some(JaegerData::ServiceNames(services)),
381                            total: services_num,
382                            ..Default::default()
383                        }),
384                    )
385                }
386                Err(err) => {
387                    error!("Failed to get services: {:?}", err);
388                    error_response(err)
389                }
390            },
391            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
392            Err(err) => {
393                error!("Failed to get services: {:?}", err);
394                error_response(err)
395            }
396        },
397        Err(err) => handle_query_error(err, "Failed to get services", &db),
398    }
399}
400
401/// Handle the GET `/api/traces/{trace_id}` request.
402#[axum_macros::debug_handler]
403#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
404pub async fn handle_get_trace(
405    State(handler): State<JaegerQueryHandlerRef>,
406    Path(trace_id): Path<String>,
407    Query(query_params): Query<JaegerQueryParams>,
408    Extension(mut query_ctx): Extension<QueryContext>,
409    TraceTableName(table_name): TraceTableName,
410) -> impl IntoResponse {
411    debug!(
412        "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
413        trace_id, query_params, query_ctx
414    );
415
416    update_query_context(&mut query_ctx, table_name);
417    let query_ctx = Arc::new(query_ctx);
418    let db = query_ctx.get_db_string();
419
420    // Record the query time histogram.
421    let _timer = METRIC_JAEGER_QUERY_ELAPSED
422        .with_label_values(&[&db, "/api/traces"])
423        .start_timer();
424
425    // Convert start time and end time from microseconds to nanoseconds.
426    let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
427    let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
428
429    let output = match handler
430        .get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns)
431        .await
432    {
433        Ok(output) => output,
434        Err(err) => {
435            return handle_query_error(
436                err,
437                &format!("Failed to get trace for '{}'", trace_id),
438                &db,
439            );
440        }
441    };
442
443    match covert_to_records(output).await {
444        Ok(Some(records)) => match traces_from_records(records) {
445            Ok(traces) => (
446                HttpStatusCode::OK,
447                axum::Json(JaegerAPIResponse {
448                    data: Some(JaegerData::Traces(traces)),
449                    ..Default::default()
450                }),
451            ),
452            Err(err) => {
453                error!("Failed to get trace '{}': {:?}", trace_id, err);
454                error_response(err)
455            }
456        },
457        Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
458        Err(err) => {
459            error!("Failed to get trace '{}': {:?}", trace_id, err);
460            error_response(err)
461        }
462    }
463}
464
465/// Handle the GET `/api/traces` request.
466#[axum_macros::debug_handler]
467#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
468pub async fn handle_find_traces(
469    State(handler): State<JaegerQueryHandlerRef>,
470    Query(query_params): Query<JaegerQueryParams>,
471    Extension(mut query_ctx): Extension<QueryContext>,
472    TraceTableName(table_name): TraceTableName,
473) -> impl IntoResponse {
474    debug!(
475        "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
476        query_params, query_ctx
477    );
478
479    update_query_context(&mut query_ctx, table_name);
480    let query_ctx = Arc::new(query_ctx);
481    let db = query_ctx.get_db_string();
482
483    // Record the query time histogram.
484    let _timer = METRIC_JAEGER_QUERY_ELAPSED
485        .with_label_values(&[&db, "/api/traces"])
486        .start_timer();
487
488    match QueryTraceParams::from_jaeger_query_params(query_params) {
489        Ok(query_params) => {
490            let output = handler.find_traces(query_ctx, query_params).await;
491            match output {
492                Ok(output) => match covert_to_records(output).await {
493                    Ok(Some(records)) => match traces_from_records(records) {
494                        Ok(traces) => (
495                            HttpStatusCode::OK,
496                            axum::Json(JaegerAPIResponse {
497                                data: Some(JaegerData::Traces(traces)),
498                                ..Default::default()
499                            }),
500                        ),
501                        Err(err) => {
502                            error!("Failed to find traces: {:?}", err);
503                            error_response(err)
504                        }
505                    },
506                    Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
507                    Err(err) => error_response(err),
508                },
509                Err(err) => handle_query_error(err, "Failed to find traces", &db),
510            }
511        }
512        Err(e) => error_response(e),
513    }
514}
515
516/// Handle the GET `/api/operations` request.
517#[axum_macros::debug_handler]
518#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
519pub async fn handle_get_operations(
520    State(handler): State<JaegerQueryHandlerRef>,
521    Query(query_params): Query<JaegerQueryParams>,
522    Extension(mut query_ctx): Extension<QueryContext>,
523    TraceTableName(table_name): TraceTableName,
524    headers: HeaderMap,
525) -> impl IntoResponse {
526    debug!(
527        "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
528        query_params, query_ctx, headers
529    );
530
531    if let Some(service_name) = &query_params.service_name {
532        update_query_context(&mut query_ctx, table_name);
533        let query_ctx = Arc::new(query_ctx);
534        let db = query_ctx.get_db_string();
535
536        // Record the query time histogram.
537        let _timer = METRIC_JAEGER_QUERY_ELAPSED
538            .with_label_values(&[&db, "/api/operations"])
539            .start_timer();
540
541        match handler
542            .get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
543            .await
544        {
545            Ok(output) => match covert_to_records(output).await {
546                Ok(Some(records)) => match operations_from_records(records, true) {
547                    Ok(operations) => {
548                        let total = operations.len();
549                        (
550                            HttpStatusCode::OK,
551                            axum::Json(JaegerAPIResponse {
552                                data: Some(JaegerData::Operations(operations)),
553                                total,
554                                ..Default::default()
555                            }),
556                        )
557                    }
558                    Err(err) => {
559                        error!("Failed to get operations: {:?}", err);
560                        error_response(err)
561                    }
562                },
563                Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
564                Err(err) => error_response(err),
565            },
566            Err(err) => handle_query_error(
567                err,
568                &format!("Failed to get operations for service '{}'", service_name),
569                &db,
570            ),
571        }
572    } else {
573        (
574            HttpStatusCode::BAD_REQUEST,
575            axum::Json(JaegerAPIResponse {
576                errors: vec![JaegerAPIError {
577                    code: 400,
578                    msg: "parameter 'service' is required".to_string(),
579                    trace_id: None,
580                }],
581                ..Default::default()
582            }),
583        )
584    }
585}
586
587/// Handle the GET `/api/services/{service_name}/operations` request.
588#[axum_macros::debug_handler]
589#[tracing::instrument(
590    skip_all,
591    fields(protocol = "jaeger", request_type = "get_operations_by_service")
592)]
593pub async fn handle_get_operations_by_service(
594    State(handler): State<JaegerQueryHandlerRef>,
595    Path(service_name): Path<String>,
596    Query(query_params): Query<JaegerQueryParams>,
597    Extension(mut query_ctx): Extension<QueryContext>,
598    TraceTableName(table_name): TraceTableName,
599    headers: HeaderMap,
600) -> impl IntoResponse {
601    debug!(
602        "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
603        service_name, query_params, query_ctx, headers
604    );
605
606    update_query_context(&mut query_ctx, table_name);
607    let query_ctx = Arc::new(query_ctx);
608    let db = query_ctx.get_db_string();
609
610    // Record the query time histogram.
611    let _timer = METRIC_JAEGER_QUERY_ELAPSED
612        .with_label_values(&[&db, "/api/services"])
613        .start_timer();
614
615    match handler.get_operations(query_ctx, &service_name, None).await {
616        Ok(output) => match covert_to_records(output).await {
617            Ok(Some(records)) => match operations_from_records(records, false) {
618                Ok(operations) => {
619                    let operations: Vec<String> =
620                        operations.into_iter().map(|op| op.name).collect();
621                    let total = operations.len();
622                    (
623                        HttpStatusCode::OK,
624                        axum::Json(JaegerAPIResponse {
625                            data: Some(JaegerData::OperationsNames(operations)),
626                            total,
627                            ..Default::default()
628                        }),
629                    )
630                }
631                Err(err) => {
632                    error!(
633                        "Failed to get operations for service '{}': {:?}",
634                        service_name, err
635                    );
636                    error_response(err)
637                }
638            },
639            Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
640            Err(err) => error_response(err),
641        },
642        Err(err) => handle_query_error(
643            err,
644            &format!("Failed to get operations for service '{}'", service_name),
645            &db,
646        ),
647    }
648}
649
650async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
651    match output.data {
652        OutputData::Stream(stream) => {
653            let records = HttpRecordsOutput::try_new(
654                stream.schema().clone(),
655                util::collect(stream)
656                    .await
657                    .context(CollectRecordbatchSnafu)?,
658            )?;
659            debug!(
660                "The query records: {}",
661                serde_json::to_string(&records).unwrap()
662            );
663            Ok(Some(records))
664        }
665        // It's unlikely to happen. However, if the output is not a stream, return None.
666        _ => Ok(None),
667    }
668}
669
670fn handle_query_error(
671    err: Error,
672    prompt: &str,
673    db: &str,
674) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
675    // To compatible with the Jaeger API, if the trace table is not found, return an empty response instead of an error.
676    if err.status_code() == StatusCode::TableNotFound {
677        warn!(
678            "No trace table '{}' found in database '{}'",
679            TRACE_TABLE_NAME, db
680        );
681        (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
682    } else {
683        error!("{}: {:?}", prompt, err);
684        error_response(err)
685    }
686}
687
688fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
689    (
690        status_code_to_http_status(&err.status_code()),
691        axum::Json(JaegerAPIResponse {
692            errors: vec![JaegerAPIError {
693                code: err.status_code() as i32,
694                msg: err.to_string(),
695                ..Default::default()
696            }],
697            ..Default::default()
698        }),
699    )
700}
701
702fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
703    // maintain the mapping: trace_id -> (process_id -> service_name).
704    let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
705    // maintain the mapping: trace_id -> spans.
706    // use BTreeMap to retain order
707    let mut trace_id_to_spans: BTreeMap<String, Vec<Span>> = BTreeMap::new();
708    // maintain the mapping: service.name -> resource.attributes.
709    let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
710
711    let is_span_attributes_flatten = !records
712        .schema
713        .column_schemas
714        .iter()
715        .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
716
717    for row in records.rows.into_iter() {
718        let mut span = Span::default();
719        let mut service_name = None;
720        let mut resource_tags = vec![];
721
722        for (idx, cell) in row.into_iter().enumerate() {
723            // safe to use index here
724            let column_name = &records.schema.column_schemas[idx].name;
725
726            match column_name.as_str() {
727                TRACE_ID_COLUMN => {
728                    if let JsonValue::String(trace_id) = cell {
729                        span.trace_id = trace_id.clone();
730                        trace_id_to_processes.entry(trace_id).or_default();
731                    }
732                }
733                TIMESTAMP_COLUMN => {
734                    span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
735                        reason: "Failed to convert timestamp to u64".to_string(),
736                    })? / 1000;
737                }
738                DURATION_NANO_COLUMN => {
739                    span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
740                        reason: "Failed to convert duration to u64".to_string(),
741                    })? / 1000;
742                }
743                SERVICE_NAME_COLUMN => {
744                    if let JsonValue::String(name) = cell {
745                        service_name = Some(name);
746                    }
747                }
748                SPAN_NAME_COLUMN => {
749                    if let JsonValue::String(span_name) = cell {
750                        span.operation_name = span_name;
751                    }
752                }
753                SPAN_ID_COLUMN => {
754                    if let JsonValue::String(span_id) = cell {
755                        span.span_id = span_id;
756                    }
757                }
758                SPAN_ATTRIBUTES_COLUMN => {
759                    // for v0 data model, span_attributes are nested as a json
760                    // data structure
761                    if let JsonValue::Object(span_attrs) = cell {
762                        span.tags.extend(object_to_tags(span_attrs));
763                    }
764                }
765                RESOURCE_ATTRIBUTES_COLUMN => {
766                    // for v0 data model, resource_attributes are nested as a json
767                    // data structure
768
769                    if let JsonValue::Object(mut resource_attrs) = cell {
770                        resource_attrs.remove(KEY_SERVICE_NAME);
771                        resource_tags = object_to_tags(resource_attrs);
772                    }
773                }
774                PARENT_SPAN_ID_COLUMN => {
775                    if let JsonValue::String(parent_span_id) = cell
776                        && !parent_span_id.is_empty()
777                    {
778                        span.references.push(Reference {
779                            trace_id: span.trace_id.clone(),
780                            span_id: parent_span_id,
781                            ref_type: REF_TYPE_CHILD_OF.to_string(),
782                        });
783                    }
784                }
785                SPAN_EVENTS_COLUMN => {
786                    if let JsonValue::Array(events) = cell {
787                        for event in events {
788                            if let JsonValue::Object(mut obj) = event {
789                                let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
790                                    continue;
791                                };
792
793                                let Some(t) =
794                                    obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
795                                        SPAN_KIND_TIME_FMTS
796                                            .iter()
797                                            .find_map(|fmt| {
798                                                chrono::DateTime::parse_from_str(s, fmt).ok()
799                                            })
800                                            .map(|dt| dt.timestamp_micros() as u64)
801                                    })
802                                else {
803                                    continue;
804                                };
805
806                                let mut fields = vec![KeyValue {
807                                    key: "event".to_string(),
808                                    value_type: ValueType::String,
809                                    value: Value::String(action.to_string()),
810                                }];
811
812                                // Add event attributes as fields
813                                if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
814                                    fields.extend(object_to_tags(attrs));
815                                }
816
817                                span.logs.push(Log {
818                                    timestamp: t,
819                                    fields,
820                                });
821                            }
822                        }
823                    }
824                }
825                SCOPE_NAME_COLUMN => {
826                    if let JsonValue::String(scope_name) = cell
827                        && !scope_name.is_empty()
828                    {
829                        span.tags.push(KeyValue {
830                            key: KEY_OTEL_SCOPE_NAME.to_string(),
831                            value_type: ValueType::String,
832                            value: Value::String(scope_name),
833                        });
834                    }
835                }
836                SCOPE_VERSION_COLUMN => {
837                    if let JsonValue::String(scope_version) = cell
838                        && !scope_version.is_empty()
839                    {
840                        span.tags.push(KeyValue {
841                            key: KEY_OTEL_SCOPE_VERSION.to_string(),
842                            value_type: ValueType::String,
843                            value: Value::String(scope_version),
844                        });
845                    }
846                }
847                SPAN_KIND_COLUMN => {
848                    if let JsonValue::String(span_kind) = cell
849                        && !span_kind.is_empty()
850                    {
851                        span.tags.push(KeyValue {
852                            key: KEY_SPAN_KIND.to_string(),
853                            value_type: ValueType::String,
854                            value: Value::String(normalize_span_kind(&span_kind)),
855                        });
856                    }
857                }
858                SPAN_STATUS_CODE => {
859                    if let JsonValue::String(span_status) = cell
860                        && span_status != SPAN_STATUS_UNSET
861                        && !span_status.is_empty()
862                    {
863                        span.tags.push(KeyValue {
864                            key: KEY_OTEL_STATUS_CODE.to_string(),
865                            value_type: ValueType::String,
866                            value: Value::String(normalize_status_code(&span_status)),
867                        });
868                        // set error to comply with the Jaeger API
869                        if span_status == SPAN_STATUS_ERROR {
870                            span.tags.push(KeyValue {
871                                key: KEY_OTEL_STATUS_ERROR_KEY.to_string(),
872                                value_type: ValueType::Boolean,
873                                value: Value::Boolean(true),
874                            });
875                        }
876                    }
877                }
878
879                SPAN_STATUS_MESSAGE_COLUMN => {
880                    if let JsonValue::String(span_status_message) = cell
881                        && !span_status_message.is_empty()
882                    {
883                        span.tags.push(KeyValue {
884                            key: KEY_OTEL_STATUS_MESSAGE.to_string(),
885                            value_type: ValueType::String,
886                            value: Value::String(span_status_message),
887                        });
888                    }
889                }
890
891                TRACE_STATE_COLUMN => {
892                    if let JsonValue::String(trace_state) = cell
893                        && !trace_state.is_empty()
894                    {
895                        span.tags.push(KeyValue {
896                            key: KEY_OTEL_TRACE_STATE.to_string(),
897                            value_type: ValueType::String,
898                            value: Value::String(trace_state),
899                        });
900                    }
901                }
902
903                _ => {
904                    // this this v1 data model
905                    if is_span_attributes_flatten {
906                        const SPAN_ATTR_PREFIX: &str = "span_attributes.";
907                        const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
908                        // a span attributes column
909                        if column_name.starts_with(SPAN_ATTR_PREFIX) {
910                            if let Some(keyvalue) = to_keyvalue(
911                                column_name
912                                    .strip_prefix(SPAN_ATTR_PREFIX)
913                                    .unwrap_or_default()
914                                    .to_string(),
915                                cell,
916                            ) {
917                                span.tags.push(keyvalue);
918                            }
919                        } else if column_name.starts_with(RESOURCE_ATTR_PREFIX)
920                            && let Some(keyvalue) = to_keyvalue(
921                                column_name
922                                    .strip_prefix(RESOURCE_ATTR_PREFIX)
923                                    .unwrap_or_default()
924                                    .to_string(),
925                                cell,
926                            )
927                        {
928                            resource_tags.push(keyvalue);
929                        }
930                    }
931                }
932            }
933        }
934
935        if let Some(service_name) = service_name {
936            if !service_to_resource_attributes.contains_key(&service_name) {
937                service_to_resource_attributes.insert(service_name.clone(), resource_tags);
938            }
939
940            if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
941                if let Some(process_id) = process.get(&service_name) {
942                    span.process_id = process_id.clone();
943                } else {
944                    // Allocate a new process id.
945                    let process_id = format!("p{}", process.len() + 1);
946                    process.insert(service_name, process_id.clone());
947                    span.process_id = process_id;
948                }
949            }
950        }
951
952        // ensure span tags order
953        span.tags.sort_by(|a, b| a.key.cmp(&b.key));
954
955        if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
956            spans.push(span);
957        } else {
958            trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
959        }
960    }
961
962    let mut traces = Vec::new();
963    for (trace_id, spans) in trace_id_to_spans {
964        let mut trace = Trace {
965            trace_id,
966            spans,
967            ..Default::default()
968        };
969
970        if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
971            let mut process_id_to_process = HashMap::new();
972            for (service_name, process_id) in processes.into_iter() {
973                let tags = service_to_resource_attributes
974                    .remove(&service_name)
975                    .unwrap_or_default();
976                process_id_to_process.insert(process_id, Process { service_name, tags });
977            }
978            trace.processes = process_id_to_process;
979        }
980        traces.push(trace);
981    }
982
983    Ok(traces)
984}
985
986fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
987    match value {
988        JsonValue::String(value) => Some(KeyValue {
989            key,
990            value_type: ValueType::String,
991            value: Value::String(value.clone()),
992        }),
993        JsonValue::Number(value) => Some(KeyValue {
994            key,
995            value_type: ValueType::Int64,
996            value: Value::Int64(value.as_i64().unwrap_or(0)),
997        }),
998        JsonValue::Bool(value) => Some(KeyValue {
999            key,
1000            value_type: ValueType::Boolean,
1001            value: Value::Boolean(value),
1002        }),
1003        JsonValue::Array(value) => Some(KeyValue {
1004            key,
1005            value_type: ValueType::String,
1006            value: Value::String(serde_json::to_string(&value).unwrap()),
1007        }),
1008        JsonValue::Object(value) => Some(KeyValue {
1009            key,
1010            value_type: ValueType::String,
1011            value: Value::String(serde_json::to_string(&value).unwrap()),
1012        }),
1013        JsonValue::Null => None,
1014    }
1015}
1016
1017fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1018    object
1019        .into_iter()
1020        .filter_map(|(key, value)| to_keyvalue(key, value))
1021        .collect()
1022}
1023
1024fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1025    let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1026    check_schema(&records, &expected_schema)?;
1027
1028    let mut services = Vec::with_capacity(records.total_rows);
1029    for row in records.rows.into_iter() {
1030        for value in row.into_iter() {
1031            if let JsonValue::String(service_name) = value {
1032                services.push(service_name);
1033            }
1034        }
1035    }
1036    Ok(services)
1037}
1038
1039// Construct Jaeger operations from records.
1040fn operations_from_records(
1041    records: HttpRecordsOutput,
1042    contain_span_kind: bool,
1043) -> Result<Vec<Operation>> {
1044    let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1045    check_schema(&records, &expected_schema)?;
1046
1047    let mut operations = Vec::with_capacity(records.total_rows);
1048    for row in records.rows.into_iter() {
1049        let mut row_iter = row.into_iter();
1050        if let Some(JsonValue::String(operation)) = row_iter.next() {
1051            let mut operation = Operation {
1052                name: operation,
1053                span_kind: None,
1054            };
1055            if contain_span_kind {
1056                if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1057                    operation.span_kind = Some(normalize_span_kind(&span_kind));
1058                }
1059            } else {
1060                // skip span kind.
1061                row_iter.next();
1062            }
1063            operations.push(operation);
1064        }
1065    }
1066
1067    Ok(operations)
1068}
1069
1070// Check whether the schema of the records is correct.
1071fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1072    for (i, column) in records.schema.column_schemas.iter().enumerate() {
1073        if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1074            InvalidJaegerQuerySnafu {
1075                reason: "query result schema is not correct".to_string(),
1076            }
1077            .fail()?
1078        }
1079    }
1080    Ok(())
1081}
1082
1083// By default, the span kind is stored as `SPAN_KIND_<kind>` in GreptimeDB.
1084// However, in Jaeger API, the span kind is returned as `<kind>` which is the lowercase of the span kind and without the `SPAN_KIND_` prefix.
1085fn normalize_span_kind(span_kind: &str) -> String {
1086    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1087    if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1088        stripped.to_lowercase()
1089    } else {
1090        // It's unlikely to happen. However, we still convert it to lowercase for consistency.
1091        span_kind.to_lowercase()
1092    }
1093}
1094
1095// By default, the status code is stored as `STATUS_CODE_<code>` in GreptimeDB.
1096// However, in Jaeger API, the status code is returned as `<code>` without the `STATUS_CODE_` prefix.
1097fn normalize_status_code(status_code: &str) -> String {
1098    // If the span_kind starts with `SPAN_KIND_` prefix, remove it and convert to lowercase.
1099    if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1100        stripped.to_string()
1101    } else {
1102        // It's unlikely to happen
1103        status_code.to_string()
1104    }
1105}
1106
1107fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1108    if let Some(data) = input.as_str() {
1109        if let Ok(number) = data.parse::<i64>() {
1110            return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1111        }
1112        if let Ok(number) = data.parse::<f64>()
1113            && let Some(number) = serde_json::Number::from_f64(number)
1114        {
1115            return Some(serde_json::Value::Number(number));
1116        }
1117    }
1118
1119    None
1120}
1121
1122fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1123    if let Some(data) = input.as_str() {
1124        if data == "true" {
1125            return Some(serde_json::Value::Bool(true));
1126        }
1127        if data == "false" {
1128            return Some(serde_json::Value::Bool(false));
1129        }
1130    }
1131
1132    None
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137    use serde_json::{Number, Value as JsonValue, json};
1138
1139    use super::*;
1140    use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1141
1142    #[test]
1143    fn test_services_from_records() {
1144        // The tests is the tuple of `(test_records, expected)`.
1145        let tests = vec![(
1146            HttpRecordsOutput {
1147                schema: OutputSchema {
1148                    column_schemas: vec![ColumnSchema {
1149                        name: "service_name".to_string(),
1150                        data_type: "String".to_string(),
1151                    }],
1152                },
1153                rows: vec![
1154                    vec![JsonValue::String("test-service-0".to_string())],
1155                    vec![JsonValue::String("test-service-1".to_string())],
1156                ],
1157                total_rows: 2,
1158                metrics: HashMap::new(),
1159            },
1160            vec!["test-service-0".to_string(), "test-service-1".to_string()],
1161        )];
1162
1163        for (records, expected) in tests {
1164            let services = services_from_records(records).unwrap();
1165            assert_eq!(services, expected);
1166        }
1167    }
1168
1169    #[test]
1170    fn test_operations_from_records() {
1171        // The tests is the tuple of `(test_records, contain_span_kind, expected)`.
1172        let tests = vec![
1173            (
1174                HttpRecordsOutput {
1175                    schema: OutputSchema {
1176                        column_schemas: vec![
1177                            ColumnSchema {
1178                                name: "span_name".to_string(),
1179                                data_type: "String".to_string(),
1180                            },
1181                            ColumnSchema {
1182                                name: "span_kind".to_string(),
1183                                data_type: "String".to_string(),
1184                            },
1185                        ],
1186                    },
1187                    rows: vec![
1188                        vec![
1189                            JsonValue::String("access-mysql".to_string()),
1190                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1191                        ],
1192                        vec![
1193                            JsonValue::String("access-redis".to_string()),
1194                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1195                        ],
1196                    ],
1197                    total_rows: 2,
1198                    metrics: HashMap::new(),
1199                },
1200                false,
1201                vec![
1202                    Operation {
1203                        name: "access-mysql".to_string(),
1204                        span_kind: None,
1205                    },
1206                    Operation {
1207                        name: "access-redis".to_string(),
1208                        span_kind: None,
1209                    },
1210                ],
1211            ),
1212            (
1213                HttpRecordsOutput {
1214                    schema: OutputSchema {
1215                        column_schemas: vec![
1216                            ColumnSchema {
1217                                name: "span_name".to_string(),
1218                                data_type: "String".to_string(),
1219                            },
1220                            ColumnSchema {
1221                                name: "span_kind".to_string(),
1222                                data_type: "String".to_string(),
1223                            },
1224                        ],
1225                    },
1226                    rows: vec![
1227                        vec![
1228                            JsonValue::String("access-mysql".to_string()),
1229                            JsonValue::String("SPAN_KIND_SERVER".to_string()),
1230                        ],
1231                        vec![
1232                            JsonValue::String("access-redis".to_string()),
1233                            JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1234                        ],
1235                    ],
1236                    total_rows: 2,
1237                    metrics: HashMap::new(),
1238                },
1239                true,
1240                vec![
1241                    Operation {
1242                        name: "access-mysql".to_string(),
1243                        span_kind: Some("server".to_string()),
1244                    },
1245                    Operation {
1246                        name: "access-redis".to_string(),
1247                        span_kind: Some("client".to_string()),
1248                    },
1249                ],
1250            ),
1251        ];
1252
1253        for (records, contain_span_kind, expected) in tests {
1254            let operations = operations_from_records(records, contain_span_kind).unwrap();
1255            assert_eq!(operations, expected);
1256        }
1257    }
1258
1259    #[test]
1260    fn test_traces_from_records() {
1261        // The tests is the tuple of `(test_records, expected)`.
1262        let tests = vec![(
1263            HttpRecordsOutput {
1264                schema: OutputSchema {
1265                    column_schemas: vec![
1266                        ColumnSchema {
1267                            name: "trace_id".to_string(),
1268                            data_type: "String".to_string(),
1269                        },
1270                        ColumnSchema {
1271                            name: "timestamp".to_string(),
1272                            data_type: "TimestampNanosecond".to_string(),
1273                        },
1274                        ColumnSchema {
1275                            name: "duration_nano".to_string(),
1276                            data_type: "UInt64".to_string(),
1277                        },
1278                        ColumnSchema {
1279                            name: "service_name".to_string(),
1280                            data_type: "String".to_string(),
1281                        },
1282                        ColumnSchema {
1283                            name: "span_name".to_string(),
1284                            data_type: "String".to_string(),
1285                        },
1286                        ColumnSchema {
1287                            name: "span_id".to_string(),
1288                            data_type: "String".to_string(),
1289                        },
1290                        ColumnSchema {
1291                            name: "span_attributes".to_string(),
1292                            data_type: "Json".to_string(),
1293                        },
1294                    ],
1295                },
1296                rows: vec![
1297                    vec![
1298                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1299                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1300                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1301                        JsonValue::String("test-service-0".to_string()),
1302                        JsonValue::String("access-mysql".to_string()),
1303                        JsonValue::String("008421dbbd33a3e9".to_string()),
1304                        JsonValue::Object(
1305                            json!({
1306                                "operation.type": "access-mysql",
1307                            })
1308                            .as_object()
1309                            .unwrap()
1310                            .clone(),
1311                        ),
1312                    ],
1313                    vec![
1314                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1315                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1316                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1317                        JsonValue::String("test-service-0".to_string()),
1318                        JsonValue::String("access-redis".to_string()),
1319                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1320                        JsonValue::Object(
1321                            json!({
1322                                "operation.type": "access-redis",
1323                            })
1324                            .as_object()
1325                            .unwrap()
1326                            .clone(),
1327                        ),
1328                    ],
1329                ],
1330                total_rows: 2,
1331                metrics: HashMap::new(),
1332            },
1333            vec![Trace {
1334                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1335                spans: vec![
1336                    Span {
1337                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1338                        span_id: "008421dbbd33a3e9".to_string(),
1339                        operation_name: "access-mysql".to_string(),
1340                        start_time: 1738726754492422,
1341                        duration: 100000,
1342                        tags: vec![KeyValue {
1343                            key: "operation.type".to_string(),
1344                            value_type: ValueType::String,
1345                            value: Value::String("access-mysql".to_string()),
1346                        }],
1347                        process_id: "p1".to_string(),
1348                        ..Default::default()
1349                    },
1350                    Span {
1351                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1352                        span_id: "ffa03416a7b9ea48".to_string(),
1353                        operation_name: "access-redis".to_string(),
1354                        start_time: 1738726754642422,
1355                        duration: 100000,
1356                        tags: vec![KeyValue {
1357                            key: "operation.type".to_string(),
1358                            value_type: ValueType::String,
1359                            value: Value::String("access-redis".to_string()),
1360                        }],
1361                        process_id: "p1".to_string(),
1362                        ..Default::default()
1363                    },
1364                ],
1365                processes: HashMap::from([(
1366                    "p1".to_string(),
1367                    Process {
1368                        service_name: "test-service-0".to_string(),
1369                        tags: vec![],
1370                    },
1371                )]),
1372                ..Default::default()
1373            }],
1374        )];
1375
1376        for (records, expected) in tests {
1377            let traces = traces_from_records(records).unwrap();
1378            assert_eq!(traces, expected);
1379        }
1380    }
1381
1382    #[test]
1383    fn test_traces_from_v1_records() {
1384        // The tests is the tuple of `(test_records, expected)`.
1385        let tests = vec![(
1386            HttpRecordsOutput {
1387                schema: OutputSchema {
1388                    column_schemas: vec![
1389                        ColumnSchema {
1390                            name: "trace_id".to_string(),
1391                            data_type: "String".to_string(),
1392                        },
1393                        ColumnSchema {
1394                            name: "timestamp".to_string(),
1395                            data_type: "TimestampNanosecond".to_string(),
1396                        },
1397                        ColumnSchema {
1398                            name: "duration_nano".to_string(),
1399                            data_type: "UInt64".to_string(),
1400                        },
1401                        ColumnSchema {
1402                            name: "service_name".to_string(),
1403                            data_type: "String".to_string(),
1404                        },
1405                        ColumnSchema {
1406                            name: "span_name".to_string(),
1407                            data_type: "String".to_string(),
1408                        },
1409                        ColumnSchema {
1410                            name: "span_id".to_string(),
1411                            data_type: "String".to_string(),
1412                        },
1413                        ColumnSchema {
1414                            name: "span_attributes.http.request.method".to_string(),
1415                            data_type: "String".to_string(),
1416                        },
1417                        ColumnSchema {
1418                            name: "span_attributes.http.request.url".to_string(),
1419                            data_type: "String".to_string(),
1420                        },
1421                        ColumnSchema {
1422                            name: "span_attributes.http.status_code".to_string(),
1423                            data_type: "UInt64".to_string(),
1424                        },
1425                    ],
1426                },
1427                rows: vec![
1428                    vec![
1429                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1430                        JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1431                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1432                        JsonValue::String("test-service-0".to_string()),
1433                        JsonValue::String("access-mysql".to_string()),
1434                        JsonValue::String("008421dbbd33a3e9".to_string()),
1435                        JsonValue::String("GET".to_string()),
1436                        JsonValue::String("/data".to_string()),
1437                        JsonValue::Number(Number::from_u128(200).unwrap()),
1438                    ],
1439                    vec![
1440                        JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1441                        JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1442                        JsonValue::Number(Number::from_u128(100000000).unwrap()),
1443                        JsonValue::String("test-service-0".to_string()),
1444                        JsonValue::String("access-redis".to_string()),
1445                        JsonValue::String("ffa03416a7b9ea48".to_string()),
1446                        JsonValue::String("POST".to_string()),
1447                        JsonValue::String("/create".to_string()),
1448                        JsonValue::Number(Number::from_u128(400).unwrap()),
1449                    ],
1450                ],
1451                total_rows: 2,
1452                metrics: HashMap::new(),
1453            },
1454            vec![Trace {
1455                trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1456                spans: vec![
1457                    Span {
1458                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1459                        span_id: "008421dbbd33a3e9".to_string(),
1460                        operation_name: "access-mysql".to_string(),
1461                        start_time: 1738726754492422,
1462                        duration: 100000,
1463                        tags: vec![
1464                            KeyValue {
1465                                key: "http.request.method".to_string(),
1466                                value_type: ValueType::String,
1467                                value: Value::String("GET".to_string()),
1468                            },
1469                            KeyValue {
1470                                key: "http.request.url".to_string(),
1471                                value_type: ValueType::String,
1472                                value: Value::String("/data".to_string()),
1473                            },
1474                            KeyValue {
1475                                key: "http.status_code".to_string(),
1476                                value_type: ValueType::Int64,
1477                                value: Value::Int64(200),
1478                            },
1479                        ],
1480                        process_id: "p1".to_string(),
1481                        ..Default::default()
1482                    },
1483                    Span {
1484                        trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1485                        span_id: "ffa03416a7b9ea48".to_string(),
1486                        operation_name: "access-redis".to_string(),
1487                        start_time: 1738726754642422,
1488                        duration: 100000,
1489                        tags: vec![
1490                            KeyValue {
1491                                key: "http.request.method".to_string(),
1492                                value_type: ValueType::String,
1493                                value: Value::String("POST".to_string()),
1494                            },
1495                            KeyValue {
1496                                key: "http.request.url".to_string(),
1497                                value_type: ValueType::String,
1498                                value: Value::String("/create".to_string()),
1499                            },
1500                            KeyValue {
1501                                key: "http.status_code".to_string(),
1502                                value_type: ValueType::Int64,
1503                                value: Value::Int64(400),
1504                            },
1505                        ],
1506                        process_id: "p1".to_string(),
1507                        ..Default::default()
1508                    },
1509                ],
1510                processes: HashMap::from([(
1511                    "p1".to_string(),
1512                    Process {
1513                        service_name: "test-service-0".to_string(),
1514                        tags: vec![],
1515                    },
1516                )]),
1517                ..Default::default()
1518            }],
1519        )];
1520
1521        for (records, expected) in tests {
1522            let traces = traces_from_records(records).unwrap();
1523            assert_eq!(traces, expected);
1524        }
1525    }
1526
1527    #[test]
1528    fn test_from_jaeger_query_params() {
1529        // The tests is the tuple of `(test_query_params, expected)`.
1530        let tests = vec![
1531            (
1532                JaegerQueryParams {
1533                    service_name: Some("test-service-0".to_string()),
1534                    ..Default::default()
1535                },
1536                QueryTraceParams {
1537                    service_name: "test-service-0".to_string(),
1538                    ..Default::default()
1539                },
1540            ),
1541            (
1542                JaegerQueryParams {
1543                    service_name: Some("test-service-0".to_string()),
1544                    operation_name: Some("access-mysql".to_string()),
1545                    start: Some(1738726754492422),
1546                    end: Some(1738726754642422),
1547                    max_duration: Some("100ms".to_string()),
1548                    min_duration: Some("50ms".to_string()),
1549                    limit: Some(10),
1550                    tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1551                    ..Default::default()
1552                },
1553                QueryTraceParams {
1554                    service_name: "test-service-0".to_string(),
1555                    operation_name: Some("access-mysql".to_string()),
1556                    start_time: Some(1738726754492422000),
1557                    end_time: Some(1738726754642422000),
1558                    min_duration: Some(50000000),
1559                    max_duration: Some(100000000),
1560                    limit: Some(10),
1561                    tags: Some(HashMap::from([
1562                        ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1563                        ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1564                        ("error".to_string(), JsonValue::Bool(false)),
1565                        ("http.method".to_string(), JsonValue::String("GET".to_string())),
1566                        ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1567                    ])),
1568                },
1569            ),
1570        ];
1571
1572        for (query_params, expected) in tests {
1573            let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1574            assert_eq!(query_params, expected);
1575        }
1576    }
1577
1578    #[test]
1579    fn test_check_schema() {
1580        // The tests is the tuple of `(test_records, expected_schema, is_ok)`.
1581        let tests = vec![(
1582            HttpRecordsOutput {
1583                schema: OutputSchema {
1584                    column_schemas: vec![
1585                        ColumnSchema {
1586                            name: "trace_id".to_string(),
1587                            data_type: "String".to_string(),
1588                        },
1589                        ColumnSchema {
1590                            name: "timestamp".to_string(),
1591                            data_type: "TimestampNanosecond".to_string(),
1592                        },
1593                        ColumnSchema {
1594                            name: "duration_nano".to_string(),
1595                            data_type: "UInt64".to_string(),
1596                        },
1597                        ColumnSchema {
1598                            name: "service_name".to_string(),
1599                            data_type: "String".to_string(),
1600                        },
1601                        ColumnSchema {
1602                            name: "span_name".to_string(),
1603                            data_type: "String".to_string(),
1604                        },
1605                        ColumnSchema {
1606                            name: "span_id".to_string(),
1607                            data_type: "String".to_string(),
1608                        },
1609                        ColumnSchema {
1610                            name: "span_attributes".to_string(),
1611                            data_type: "Json".to_string(),
1612                        },
1613                    ],
1614                },
1615                rows: vec![],
1616                total_rows: 0,
1617                metrics: HashMap::new(),
1618            },
1619            vec![
1620                (TRACE_ID_COLUMN, "String"),
1621                (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1622                (DURATION_NANO_COLUMN, "UInt64"),
1623                (SERVICE_NAME_COLUMN, "String"),
1624                (SPAN_NAME_COLUMN, "String"),
1625                (SPAN_ID_COLUMN, "String"),
1626                (SPAN_ATTRIBUTES_COLUMN, "Json"),
1627            ],
1628            true,
1629        )];
1630
1631        for (records, expected_schema, is_ok) in tests {
1632            let result = check_schema(&records, &expected_schema);
1633            assert_eq!(result.is_ok(), is_ok);
1634        }
1635    }
1636
1637    #[test]
1638    fn test_normalize_span_kind() {
1639        let tests = vec![
1640            ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1641            ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1642        ];
1643
1644        for (input, expected) in tests {
1645            let result = normalize_span_kind(&input);
1646            assert_eq!(result, expected);
1647        }
1648    }
1649
1650    #[test]
1651    fn test_convert_string_to_number() {
1652        let tests = vec![
1653            (
1654                JsonValue::String("123".to_string()),
1655                Some(JsonValue::Number(Number::from(123))),
1656            ),
1657            (
1658                JsonValue::String("123.456".to_string()),
1659                Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1660            ),
1661        ];
1662
1663        for (input, expected) in tests {
1664            let result = convert_string_to_number(&input);
1665            assert_eq!(result, expected);
1666        }
1667    }
1668
1669    #[test]
1670    fn test_convert_string_to_boolean() {
1671        let tests = vec![
1672            (
1673                JsonValue::String("true".to_string()),
1674                Some(JsonValue::Bool(true)),
1675            ),
1676            (
1677                JsonValue::String("false".to_string()),
1678                Some(JsonValue::Bool(false)),
1679            ),
1680        ];
1681
1682        for (input, expected) in tests {
1683            let result = convert_string_to_boolean(&input);
1684            assert_eq!(result, expected);
1685        }
1686    }
1687}