1use std::collections::HashMap;
16use std::sync::Arc;
17
18use axum::extract::{Path, Query, State};
19use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
20use axum::response::IntoResponse;
21use axum::Extension;
22use chrono::Utc;
23use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
24use common_error::ext::ErrorExt;
25use common_error::status_code::StatusCode;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::{debug, error, tracing, warn};
29use serde::{Deserialize, Serialize};
30use serde_json::Value as JsonValue;
31use session::context::{Channel, QueryContext};
32use snafu::{OptionExt, ResultExt};
33
34use crate::error::{
35 status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result,
36};
37use crate::http::extractor::TraceTableName;
38use crate::http::HttpRecordsOutput;
39use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
40use crate::otlp::trace::{
41 DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
42 KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
43 SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
44 SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
45 SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
46};
47use crate::query_handler::JaegerQueryHandlerRef;
48
49pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
50
51const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
52const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
53pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
54
55#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
58pub struct JaegerAPIResponse {
59 pub data: Option<JaegerData>,
60 pub total: usize,
61 pub limit: usize,
62 pub offset: usize,
63 pub errors: Vec<JaegerAPIError>,
64}
65
66#[derive(Debug, Serialize, Deserialize, PartialEq)]
68#[serde(untagged)]
69pub enum JaegerData {
70 ServiceNames(Vec<String>),
71 OperationsNames(Vec<String>),
72 Operations(Vec<Operation>),
73 Traces(Vec<Trace>),
74}
75
76#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
78#[serde(rename_all = "camelCase")]
79pub struct JaegerAPIError {
80 pub code: i32,
81 pub msg: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub trace_id: Option<String>,
84}
85
86#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
88#[serde(rename_all = "camelCase")]
89pub struct Operation {
90 pub name: String,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub span_kind: Option<String>,
93}
94
95#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
97#[serde(rename_all = "camelCase")]
98pub struct Trace {
99 #[serde(rename = "traceID")]
100 pub trace_id: String,
101 pub spans: Vec<Span>,
102
103 #[serde(skip_serializing_if = "HashMap::is_empty")]
104 pub processes: HashMap<String, Process>,
105
106 #[serde(skip_serializing_if = "Vec::is_empty")]
107 pub warnings: Vec<String>,
108}
109
110#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
112#[serde(rename_all = "camelCase")]
113pub struct Span {
114 #[serde(rename = "traceID")]
115 pub trace_id: String,
116
117 #[serde(rename = "spanID")]
118 pub span_id: String,
119
120 #[serde(rename = "parentSpanID")]
121 #[serde(skip_serializing_if = "String::is_empty")]
122 pub parent_span_id: String,
123
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub flags: Option<u32>,
126
127 pub operation_name: String,
128 pub references: Vec<Reference>,
129 pub start_time: u64, pub duration: u64, pub tags: Vec<KeyValue>,
132 pub logs: Vec<Log>,
133
134 #[serde(rename = "processID")]
135 #[serde(skip_serializing_if = "String::is_empty")]
136 pub process_id: String,
137
138 #[serde(skip_serializing_if = "Option::is_none")]
139 pub process: Option<Process>,
140
141 #[serde(skip_serializing_if = "Vec::is_empty")]
142 pub warnings: Vec<String>,
143}
144
145#[derive(Debug, Serialize, Deserialize, PartialEq)]
147#[serde(rename_all = "camelCase")]
148pub struct Reference {
149 #[serde(rename = "traceID")]
150 pub trace_id: String,
151 #[serde(rename = "spanID")]
152 pub span_id: String,
153 pub ref_type: String,
154}
155
156#[derive(Debug, Serialize, Deserialize, PartialEq)]
158#[serde(rename_all = "camelCase")]
159pub struct Process {
160 pub service_name: String,
161 pub tags: Vec<KeyValue>,
162}
163
164#[derive(Debug, Serialize, Deserialize, PartialEq)]
166#[serde(rename_all = "camelCase")]
167pub struct Log {
168 pub timestamp: u64,
169 pub fields: Vec<KeyValue>,
170}
171
172#[derive(Debug, Serialize, Deserialize, PartialEq)]
174#[serde(rename_all = "camelCase")]
175pub struct KeyValue {
176 pub key: String,
177 #[serde(rename = "type")]
178 pub value_type: ValueType,
179 pub value: Value,
180}
181
182#[derive(Debug, Serialize, Deserialize, PartialEq)]
184#[serde(untagged)]
185#[serde(rename_all = "camelCase")]
186pub enum Value {
187 String(String),
188 Int64(i64),
189 Float64(f64),
190 Boolean(bool),
191 Binary(Vec<u8>),
192}
193
194#[derive(Debug, Serialize, Deserialize, PartialEq)]
196#[serde(rename_all = "lowercase")]
197pub enum ValueType {
198 String,
199 Int64,
200 Float64,
201 Boolean,
202 Binary,
203}
204
205#[derive(Default, Debug, Serialize, Deserialize)]
207#[serde(rename_all = "camelCase")]
208pub struct JaegerQueryParams {
209 #[serde(rename = "service")]
211 pub service_name: Option<String>,
212
213 #[serde(rename = "operation")]
215 pub operation_name: Option<String>,
216
217 pub limit: Option<usize>,
219
220 pub start: Option<i64>,
222
223 pub end: Option<i64>,
225
226 pub max_duration: Option<String>,
228
229 pub min_duration: Option<String>,
231
232 pub tags: Option<String>,
236
237 pub span_kind: Option<String>,
239}
240
241fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
242 query_ctx.set_channel(Channel::Jaeger);
244 if let Some(table) = table_name {
245 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
246 }
247}
248
249impl QueryTraceParams {
250 fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
251 let mut internal_query_params: QueryTraceParams = QueryTraceParams {
252 service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
253 reason: "service_name is required".to_string(),
254 })?,
255 operation_name: query_params.operation_name,
256 start_time: query_params.start.map(|start| start * 1000),
258 end_time: query_params.end.map(|end| end * 1000),
259 ..Default::default()
260 };
261
262 if let Some(max_duration) = query_params.max_duration {
263 let duration = humantime::parse_duration(&max_duration).map_err(|e| {
264 InvalidJaegerQuerySnafu {
265 reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
266 }
267 .build()
268 })?;
269 internal_query_params.max_duration = Some(duration.as_nanos() as u64);
270 }
271
272 if let Some(min_duration) = query_params.min_duration {
273 let duration = humantime::parse_duration(&min_duration).map_err(|e| {
274 InvalidJaegerQuerySnafu {
275 reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
276 }
277 .build()
278 })?;
279 internal_query_params.min_duration = Some(duration.as_nanos() as u64);
280 }
281
282 if let Some(tags) = query_params.tags {
283 let mut tags_map: HashMap<String, JsonValue> =
285 serde_json::from_str(&tags).map_err(|e| {
286 InvalidJaegerQuerySnafu {
287 reason: format!("parse tags '{}' failed: {}", tags, e),
288 }
289 .build()
290 })?;
291 for (_, v) in tags_map.iter_mut() {
292 if let Some(number) = convert_string_to_number(v) {
293 *v = number;
294 }
295 if let Some(boolean) = convert_string_to_boolean(v) {
296 *v = boolean;
297 }
298 }
299 internal_query_params.tags = Some(tags_map);
300 }
301
302 internal_query_params.limit = query_params.limit;
303
304 Ok(internal_query_params)
305 }
306}
307
308#[derive(Debug, Default, PartialEq)]
309pub struct QueryTraceParams {
310 pub service_name: String,
311 pub operation_name: Option<String>,
312
313 pub limit: Option<usize>,
315
316 pub tags: Option<HashMap<String, JsonValue>>,
318
319 pub start_time: Option<i64>,
321 pub end_time: Option<i64>,
322 pub min_duration: Option<u64>,
323 pub max_duration: Option<u64>,
324}
325
326#[axum_macros::debug_handler]
328#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
329pub async fn handle_get_services(
330 State(handler): State<JaegerQueryHandlerRef>,
331 Query(query_params): Query<JaegerQueryParams>,
332 Extension(mut query_ctx): Extension<QueryContext>,
333 TraceTableName(table_name): TraceTableName,
334) -> impl IntoResponse {
335 debug!(
336 "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
337 query_params, query_ctx
338 );
339
340 query_ctx.set_channel(Channel::Jaeger);
341 if let Some(table) = table_name {
342 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
343 }
344
345 let query_ctx = Arc::new(query_ctx);
346 let db = query_ctx.get_db_string();
347
348 let _timer = METRIC_JAEGER_QUERY_ELAPSED
350 .with_label_values(&[&db, "/api/services"])
351 .start_timer();
352
353 match handler.get_services(query_ctx).await {
354 Ok(output) => match covert_to_records(output).await {
355 Ok(Some(records)) => match services_from_records(records) {
356 Ok(services) => {
357 let services_num = services.len();
358 (
359 HttpStatusCode::OK,
360 axum::Json(JaegerAPIResponse {
361 data: Some(JaegerData::ServiceNames(services)),
362 total: services_num,
363 ..Default::default()
364 }),
365 )
366 }
367 Err(err) => {
368 error!("Failed to get services: {:?}", err);
369 error_response(err)
370 }
371 },
372 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
373 Err(err) => {
374 error!("Failed to get services: {:?}", err);
375 error_response(err)
376 }
377 },
378 Err(err) => handle_query_error(err, "Failed to get services", &db),
379 }
380}
381
382#[axum_macros::debug_handler]
384#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
385pub async fn handle_get_trace(
386 State(handler): State<JaegerQueryHandlerRef>,
387 Path(trace_id): Path<String>,
388 Query(query_params): Query<JaegerQueryParams>,
389 Extension(mut query_ctx): Extension<QueryContext>,
390 TraceTableName(table_name): TraceTableName,
391) -> impl IntoResponse {
392 debug!(
393 "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
394 trace_id, query_params, query_ctx
395 );
396
397 update_query_context(&mut query_ctx, table_name);
398 let query_ctx = Arc::new(query_ctx);
399 let db = query_ctx.get_db_string();
400
401 let _timer = METRIC_JAEGER_QUERY_ELAPSED
403 .with_label_values(&[&db, "/api/traces"])
404 .start_timer();
405
406 let output = match handler.get_trace(query_ctx, &trace_id).await {
407 Ok(output) => output,
408 Err(err) => {
409 return handle_query_error(
410 err,
411 &format!("Failed to get trace for '{}'", trace_id),
412 &db,
413 );
414 }
415 };
416
417 match covert_to_records(output).await {
418 Ok(Some(records)) => match traces_from_records(records) {
419 Ok(traces) => (
420 HttpStatusCode::OK,
421 axum::Json(JaegerAPIResponse {
422 data: Some(JaegerData::Traces(traces)),
423 ..Default::default()
424 }),
425 ),
426 Err(err) => {
427 error!("Failed to get trace '{}': {:?}", trace_id, err);
428 error_response(err)
429 }
430 },
431 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
432 Err(err) => {
433 error!("Failed to get trace '{}': {:?}", trace_id, err);
434 error_response(err)
435 }
436 }
437}
438
439#[axum_macros::debug_handler]
441#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
442pub async fn handle_find_traces(
443 State(handler): State<JaegerQueryHandlerRef>,
444 Query(query_params): Query<JaegerQueryParams>,
445 Extension(mut query_ctx): Extension<QueryContext>,
446 TraceTableName(table_name): TraceTableName,
447) -> impl IntoResponse {
448 debug!(
449 "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
450 query_params, query_ctx
451 );
452
453 update_query_context(&mut query_ctx, table_name);
454 let query_ctx = Arc::new(query_ctx);
455 let db = query_ctx.get_db_string();
456
457 let _timer = METRIC_JAEGER_QUERY_ELAPSED
459 .with_label_values(&[&db, "/api/traces"])
460 .start_timer();
461
462 match QueryTraceParams::from_jaeger_query_params(query_params) {
463 Ok(query_params) => {
464 let output = handler.find_traces(query_ctx, query_params).await;
465 match output {
466 Ok(output) => match covert_to_records(output).await {
467 Ok(Some(records)) => match traces_from_records(records) {
468 Ok(traces) => (
469 HttpStatusCode::OK,
470 axum::Json(JaegerAPIResponse {
471 data: Some(JaegerData::Traces(traces)),
472 ..Default::default()
473 }),
474 ),
475 Err(err) => {
476 error!("Failed to find traces: {:?}", err);
477 error_response(err)
478 }
479 },
480 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
481 Err(err) => error_response(err),
482 },
483 Err(err) => handle_query_error(err, "Failed to find traces", &db),
484 }
485 }
486 Err(e) => error_response(e),
487 }
488}
489
490#[axum_macros::debug_handler]
492#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
493pub async fn handle_get_operations(
494 State(handler): State<JaegerQueryHandlerRef>,
495 Query(query_params): Query<JaegerQueryParams>,
496 Extension(mut query_ctx): Extension<QueryContext>,
497 TraceTableName(table_name): TraceTableName,
498 headers: HeaderMap,
499) -> impl IntoResponse {
500 debug!(
501 "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
502 query_params, query_ctx, headers
503 );
504
505 let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
506 Ok((start, end)) => (start, end),
507 Err(e) => return error_response(e),
508 };
509
510 debug!("Get operations with start: {:?}, end: {:?}", start, end);
511
512 if let Some(service_name) = &query_params.service_name {
513 update_query_context(&mut query_ctx, table_name);
514 let query_ctx = Arc::new(query_ctx);
515 let db = query_ctx.get_db_string();
516
517 let _timer = METRIC_JAEGER_QUERY_ELAPSED
519 .with_label_values(&[&db, "/api/operations"])
520 .start_timer();
521
522 match handler
523 .get_operations(
524 query_ctx,
525 service_name,
526 query_params.span_kind.as_deref(),
527 start,
528 end,
529 )
530 .await
531 {
532 Ok(output) => match covert_to_records(output).await {
533 Ok(Some(records)) => match operations_from_records(records, true) {
534 Ok(operations) => {
535 let total = operations.len();
536 (
537 HttpStatusCode::OK,
538 axum::Json(JaegerAPIResponse {
539 data: Some(JaegerData::Operations(operations)),
540 total,
541 ..Default::default()
542 }),
543 )
544 }
545 Err(err) => {
546 error!("Failed to get operations: {:?}", err);
547 error_response(err)
548 }
549 },
550 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
551 Err(err) => error_response(err),
552 },
553 Err(err) => handle_query_error(
554 err,
555 &format!("Failed to get operations for service '{}'", service_name),
556 &db,
557 ),
558 }
559 } else {
560 (
561 HttpStatusCode::BAD_REQUEST,
562 axum::Json(JaegerAPIResponse {
563 errors: vec![JaegerAPIError {
564 code: 400,
565 msg: "parameter 'service' is required".to_string(),
566 trace_id: None,
567 }],
568 ..Default::default()
569 }),
570 )
571 }
572}
573
574#[axum_macros::debug_handler]
576#[tracing::instrument(
577 skip_all,
578 fields(protocol = "jaeger", request_type = "get_operations_by_service")
579)]
580pub async fn handle_get_operations_by_service(
581 State(handler): State<JaegerQueryHandlerRef>,
582 Path(service_name): Path<String>,
583 Query(query_params): Query<JaegerQueryParams>,
584 Extension(mut query_ctx): Extension<QueryContext>,
585 TraceTableName(table_name): TraceTableName,
586 headers: HeaderMap,
587) -> impl IntoResponse {
588 debug!(
589 "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
590 service_name, query_params, query_ctx, headers
591 );
592
593 update_query_context(&mut query_ctx, table_name);
594 let query_ctx = Arc::new(query_ctx);
595 let db = query_ctx.get_db_string();
596
597 let _timer = METRIC_JAEGER_QUERY_ELAPSED
599 .with_label_values(&[&db, "/api/services"])
600 .start_timer();
601
602 let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
603 Ok((start, end)) => (start, end),
604 Err(e) => return error_response(e),
605 };
606
607 match handler
608 .get_operations(query_ctx, &service_name, None, start, end)
609 .await
610 {
611 Ok(output) => match covert_to_records(output).await {
612 Ok(Some(records)) => match operations_from_records(records, false) {
613 Ok(operations) => {
614 let operations: Vec<String> =
615 operations.into_iter().map(|op| op.name).collect();
616 let total = operations.len();
617 (
618 HttpStatusCode::OK,
619 axum::Json(JaegerAPIResponse {
620 data: Some(JaegerData::OperationsNames(operations)),
621 total,
622 ..Default::default()
623 }),
624 )
625 }
626 Err(err) => {
627 error!(
628 "Failed to get operations for service '{}': {:?}",
629 service_name, err
630 );
631 error_response(err)
632 }
633 },
634 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
635 Err(err) => error_response(err),
636 },
637 Err(err) => handle_query_error(
638 err,
639 &format!("Failed to get operations for service '{}'", service_name),
640 &db,
641 ),
642 }
643}
644
645async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
646 match output.data {
647 OutputData::Stream(stream) => {
648 let records = HttpRecordsOutput::try_new(
649 stream.schema().clone(),
650 util::collect(stream)
651 .await
652 .context(CollectRecordbatchSnafu)?,
653 )?;
654 debug!("The query records: {:?}", records);
655 Ok(Some(records))
656 }
657 _ => Ok(None),
659 }
660}
661
662fn handle_query_error(
663 err: Error,
664 prompt: &str,
665 db: &str,
666) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
667 if err.status_code() == StatusCode::TableNotFound {
669 warn!(
670 "No trace table '{}' found in database '{}'",
671 TRACE_TABLE_NAME, db
672 );
673 (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
674 } else {
675 error!("{}: {:?}", prompt, err);
676 error_response(err)
677 }
678}
679
680fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
681 (
682 status_code_to_http_status(&err.status_code()),
683 axum::Json(JaegerAPIResponse {
684 errors: vec![JaegerAPIError {
685 code: err.status_code() as i32,
686 msg: err.to_string(),
687 ..Default::default()
688 }],
689 ..Default::default()
690 }),
691 )
692}
693
694fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
695 let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
697 let mut trace_id_to_spans: HashMap<String, Vec<Span>> = HashMap::new();
699 let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
701
702 let is_span_attributes_flatten = !records
703 .schema
704 .column_schemas
705 .iter()
706 .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
707
708 for row in records.rows.into_iter() {
709 let mut span = Span::default();
710 let mut service_name = None;
711 let mut resource_tags = vec![];
712
713 for (idx, cell) in row.into_iter().enumerate() {
714 let column_name = &records.schema.column_schemas[idx].name;
716
717 match column_name.as_str() {
718 TRACE_ID_COLUMN => {
719 if let JsonValue::String(trace_id) = cell {
720 span.trace_id = trace_id.clone();
721 trace_id_to_processes.entry(trace_id).or_default();
722 }
723 }
724 TIMESTAMP_COLUMN => {
725 span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
726 reason: "Failed to convert timestamp to u64".to_string(),
727 })? / 1000;
728 }
729 DURATION_NANO_COLUMN => {
730 span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
731 reason: "Failed to convert duration to u64".to_string(),
732 })? / 1000;
733 }
734 SERVICE_NAME_COLUMN => {
735 if let JsonValue::String(name) = cell {
736 service_name = Some(name);
737 }
738 }
739 SPAN_NAME_COLUMN => {
740 if let JsonValue::String(span_name) = cell {
741 span.operation_name = span_name;
742 }
743 }
744 SPAN_ID_COLUMN => {
745 if let JsonValue::String(span_id) = cell {
746 span.span_id = span_id;
747 }
748 }
749 SPAN_ATTRIBUTES_COLUMN => {
750 if let JsonValue::Object(span_attrs) = cell {
753 span.tags.extend(object_to_tags(span_attrs));
754 }
755 }
756 RESOURCE_ATTRIBUTES_COLUMN => {
757 if let JsonValue::Object(mut resource_attrs) = cell {
761 resource_attrs.remove(KEY_SERVICE_NAME);
762 resource_tags = object_to_tags(resource_attrs);
763 }
764 }
765 PARENT_SPAN_ID_COLUMN => {
766 if let JsonValue::String(parent_span_id) = cell {
767 if !parent_span_id.is_empty() {
768 span.references.push(Reference {
769 trace_id: span.trace_id.clone(),
770 span_id: parent_span_id,
771 ref_type: REF_TYPE_CHILD_OF.to_string(),
772 });
773 }
774 }
775 }
776 SPAN_EVENTS_COLUMN => {
777 if let JsonValue::Array(events) = cell {
778 for event in events {
779 if let JsonValue::Object(mut obj) = event {
780 let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
781 continue;
782 };
783
784 let Some(t) =
785 obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
786 SPAN_KIND_TIME_FMTS
787 .iter()
788 .find_map(|fmt| {
789 chrono::DateTime::parse_from_str(s, fmt).ok()
790 })
791 .map(|dt| dt.timestamp_micros() as u64)
792 })
793 else {
794 continue;
795 };
796
797 let mut fields = vec![KeyValue {
798 key: "event".to_string(),
799 value_type: ValueType::String,
800 value: Value::String(action.to_string()),
801 }];
802
803 if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
805 fields.extend(object_to_tags(attrs));
806 }
807
808 span.logs.push(Log {
809 timestamp: t,
810 fields,
811 });
812 }
813 }
814 }
815 }
816 SCOPE_NAME_COLUMN => {
817 if let JsonValue::String(scope_name) = cell {
818 if !scope_name.is_empty() {
819 span.tags.push(KeyValue {
820 key: KEY_OTEL_SCOPE_NAME.to_string(),
821 value_type: ValueType::String,
822 value: Value::String(scope_name),
823 });
824 }
825 }
826 }
827 SCOPE_VERSION_COLUMN => {
828 if let JsonValue::String(scope_version) = cell {
829 if !scope_version.is_empty() {
830 span.tags.push(KeyValue {
831 key: KEY_OTEL_SCOPE_VERSION.to_string(),
832 value_type: ValueType::String,
833 value: Value::String(scope_version),
834 });
835 }
836 }
837 }
838 SPAN_KIND_COLUMN => {
839 if let JsonValue::String(span_kind) = cell {
840 if !span_kind.is_empty() {
841 span.tags.push(KeyValue {
842 key: KEY_SPAN_KIND.to_string(),
843 value_type: ValueType::String,
844 value: Value::String(normalize_span_kind(&span_kind)),
845 });
846 }
847 }
848 }
849 SPAN_STATUS_CODE => {
850 if let JsonValue::String(span_status) = cell {
851 if span_status != SPAN_STATUS_UNSET && !span_status.is_empty() {
852 span.tags.push(KeyValue {
853 key: KEY_OTEL_STATUS_CODE.to_string(),
854 value_type: ValueType::String,
855 value: Value::String(normalize_status_code(&span_status)),
856 });
857 }
858 }
859 }
860
861 _ => {
862 if is_span_attributes_flatten {
864 const SPAN_ATTR_PREFIX: &str = "span_attributes.";
865 const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
866 if column_name.starts_with(SPAN_ATTR_PREFIX) {
868 if let Some(keyvalue) = to_keyvalue(
869 column_name
870 .strip_prefix(SPAN_ATTR_PREFIX)
871 .unwrap_or_default()
872 .to_string(),
873 cell,
874 ) {
875 span.tags.push(keyvalue);
876 }
877 } else if column_name.starts_with(RESOURCE_ATTR_PREFIX) {
878 if let Some(keyvalue) = to_keyvalue(
879 column_name
880 .strip_prefix(RESOURCE_ATTR_PREFIX)
881 .unwrap_or_default()
882 .to_string(),
883 cell,
884 ) {
885 resource_tags.push(keyvalue);
886 }
887 }
888 }
889 }
890 }
891 }
892
893 if let Some(service_name) = service_name {
894 if !service_to_resource_attributes.contains_key(&service_name) {
895 service_to_resource_attributes.insert(service_name.clone(), resource_tags);
896 }
897
898 if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
899 if let Some(process_id) = process.get(&service_name) {
900 span.process_id = process_id.clone();
901 } else {
902 let process_id = format!("p{}", process.len() + 1);
904 process.insert(service_name, process_id.clone());
905 span.process_id = process_id;
906 }
907 }
908 }
909
910 span.tags.sort_by(|a, b| a.key.cmp(&b.key));
912
913 if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
914 spans.push(span);
915 } else {
916 trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
917 }
918 }
919
920 let mut traces = Vec::new();
921 for (trace_id, spans) in trace_id_to_spans {
922 let mut trace = Trace {
923 trace_id,
924 spans,
925 ..Default::default()
926 };
927
928 if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
929 let mut process_id_to_process = HashMap::new();
930 for (service_name, process_id) in processes.into_iter() {
931 let tags = service_to_resource_attributes
932 .remove(&service_name)
933 .unwrap_or_default();
934 process_id_to_process.insert(process_id, Process { service_name, tags });
935 }
936 trace.processes = process_id_to_process;
937 }
938 traces.push(trace);
939 }
940
941 Ok(traces)
942}
943
944fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
945 match value {
946 JsonValue::String(value) => Some(KeyValue {
947 key,
948 value_type: ValueType::String,
949 value: Value::String(value.to_string()),
950 }),
951 JsonValue::Number(value) => Some(KeyValue {
952 key,
953 value_type: ValueType::Int64,
954 value: Value::Int64(value.as_i64().unwrap_or(0)),
955 }),
956 JsonValue::Bool(value) => Some(KeyValue {
957 key,
958 value_type: ValueType::Boolean,
959 value: Value::Boolean(value),
960 }),
961 JsonValue::Array(value) => Some(KeyValue {
962 key,
963 value_type: ValueType::String,
964 value: Value::String(serde_json::to_string(&value).unwrap()),
965 }),
966 JsonValue::Object(value) => Some(KeyValue {
967 key,
968 value_type: ValueType::String,
969 value: Value::String(serde_json::to_string(&value).unwrap()),
970 }),
971 JsonValue::Null => None,
972 }
973}
974
975fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
976 object
977 .into_iter()
978 .filter_map(|(key, value)| to_keyvalue(key, value))
979 .collect()
980}
981
982fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
983 let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
984 check_schema(&records, &expected_schema)?;
985
986 let mut services = Vec::with_capacity(records.total_rows);
987 for row in records.rows.into_iter() {
988 for value in row.into_iter() {
989 if let JsonValue::String(service_name) = value {
990 services.push(service_name);
991 }
992 }
993 }
994 Ok(services)
995}
996
997fn operations_from_records(
999 records: HttpRecordsOutput,
1000 contain_span_kind: bool,
1001) -> Result<Vec<Operation>> {
1002 let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1003 check_schema(&records, &expected_schema)?;
1004
1005 let mut operations = Vec::with_capacity(records.total_rows);
1006 for row in records.rows.into_iter() {
1007 let mut row_iter = row.into_iter();
1008 if let Some(JsonValue::String(operation)) = row_iter.next() {
1009 let mut operation = Operation {
1010 name: operation,
1011 span_kind: None,
1012 };
1013 if contain_span_kind {
1014 if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1015 operation.span_kind = Some(normalize_span_kind(&span_kind));
1016 }
1017 } else {
1018 row_iter.next();
1020 }
1021 operations.push(operation);
1022 }
1023 }
1024
1025 Ok(operations)
1026}
1027
1028fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1030 for (i, column) in records.schema.column_schemas.iter().enumerate() {
1031 if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1032 InvalidJaegerQuerySnafu {
1033 reason: "query result schema is not correct".to_string(),
1034 }
1035 .fail()?
1036 }
1037 }
1038 Ok(())
1039}
1040
1041fn normalize_span_kind(span_kind: &str) -> String {
1044 if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1046 stripped.to_lowercase()
1047 } else {
1048 span_kind.to_lowercase()
1050 }
1051}
1052
1053fn normalize_status_code(status_code: &str) -> String {
1056 if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1058 stripped.to_string()
1059 } else {
1060 status_code.to_string()
1062 }
1063}
1064
1065fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1066 if let Some(data) = input.as_str() {
1067 if let Ok(number) = data.parse::<i64>() {
1068 return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1069 }
1070 if let Ok(number) = data.parse::<f64>() {
1071 if let Some(number) = serde_json::Number::from_f64(number) {
1072 return Some(serde_json::Value::Number(number));
1073 }
1074 }
1075 }
1076
1077 None
1078}
1079
1080fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1081 if let Some(data) = input.as_str() {
1082 if data == "true" {
1083 return Some(serde_json::Value::Bool(true));
1084 }
1085 if data == "false" {
1086 return Some(serde_json::Value::Bool(false));
1087 }
1088 }
1089
1090 None
1091}
1092
1093fn parse_jaeger_time_range_for_operations(
1094 headers: &HeaderMap,
1095 query_params: &JaegerQueryParams,
1096) -> Result<(Option<i64>, Option<i64>)> {
1097 if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
1098 match time_range.to_str() {
1099 Ok(time_range) => match humantime::parse_duration(time_range) {
1100 Ok(duration) => {
1101 debug!(
1102 "Get operations with time range: {:?}, duration: {:?}",
1103 time_range, duration
1104 );
1105 let now = Utc::now().timestamp_micros();
1106 Ok((Some(now - duration.as_micros() as i64), Some(now)))
1107 }
1108 Err(e) => {
1109 error!("Failed to parse time range header: {:?}", e);
1110 Err(InvalidJaegerQuerySnafu {
1111 reason: format!("invalid time range header: {:?}", time_range),
1112 }
1113 .build())
1114 }
1115 },
1116 Err(e) => {
1117 error!("Failed to convert time range header to string: {:?}", e);
1118 Err(InvalidJaegerQuerySnafu {
1119 reason: format!("invalid time range header: {:?}", time_range),
1120 }
1121 .build())
1122 }
1123 }
1124 } else {
1125 Ok((query_params.start, query_params.end))
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use serde_json::{json, Number, Value as JsonValue};
1132
1133 use super::*;
1134 use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1135
1136 #[test]
1137 fn test_services_from_records() {
1138 let tests = vec![(
1140 HttpRecordsOutput {
1141 schema: OutputSchema {
1142 column_schemas: vec![ColumnSchema {
1143 name: "service_name".to_string(),
1144 data_type: "String".to_string(),
1145 }],
1146 },
1147 rows: vec![
1148 vec![JsonValue::String("test-service-0".to_string())],
1149 vec![JsonValue::String("test-service-1".to_string())],
1150 ],
1151 total_rows: 2,
1152 metrics: HashMap::new(),
1153 },
1154 vec!["test-service-0".to_string(), "test-service-1".to_string()],
1155 )];
1156
1157 for (records, expected) in tests {
1158 let services = services_from_records(records).unwrap();
1159 assert_eq!(services, expected);
1160 }
1161 }
1162
1163 #[test]
1164 fn test_operations_from_records() {
1165 let tests = vec![
1167 (
1168 HttpRecordsOutput {
1169 schema: OutputSchema {
1170 column_schemas: vec![
1171 ColumnSchema {
1172 name: "span_name".to_string(),
1173 data_type: "String".to_string(),
1174 },
1175 ColumnSchema {
1176 name: "span_kind".to_string(),
1177 data_type: "String".to_string(),
1178 },
1179 ],
1180 },
1181 rows: vec![
1182 vec![
1183 JsonValue::String("access-mysql".to_string()),
1184 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1185 ],
1186 vec![
1187 JsonValue::String("access-redis".to_string()),
1188 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1189 ],
1190 ],
1191 total_rows: 2,
1192 metrics: HashMap::new(),
1193 },
1194 false,
1195 vec![
1196 Operation {
1197 name: "access-mysql".to_string(),
1198 span_kind: None,
1199 },
1200 Operation {
1201 name: "access-redis".to_string(),
1202 span_kind: None,
1203 },
1204 ],
1205 ),
1206 (
1207 HttpRecordsOutput {
1208 schema: OutputSchema {
1209 column_schemas: vec![
1210 ColumnSchema {
1211 name: "span_name".to_string(),
1212 data_type: "String".to_string(),
1213 },
1214 ColumnSchema {
1215 name: "span_kind".to_string(),
1216 data_type: "String".to_string(),
1217 },
1218 ],
1219 },
1220 rows: vec![
1221 vec![
1222 JsonValue::String("access-mysql".to_string()),
1223 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1224 ],
1225 vec![
1226 JsonValue::String("access-redis".to_string()),
1227 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1228 ],
1229 ],
1230 total_rows: 2,
1231 metrics: HashMap::new(),
1232 },
1233 true,
1234 vec![
1235 Operation {
1236 name: "access-mysql".to_string(),
1237 span_kind: Some("server".to_string()),
1238 },
1239 Operation {
1240 name: "access-redis".to_string(),
1241 span_kind: Some("client".to_string()),
1242 },
1243 ],
1244 ),
1245 ];
1246
1247 for (records, contain_span_kind, expected) in tests {
1248 let operations = operations_from_records(records, contain_span_kind).unwrap();
1249 assert_eq!(operations, expected);
1250 }
1251 }
1252
1253 #[test]
1254 fn test_traces_from_records() {
1255 let tests = vec![(
1257 HttpRecordsOutput {
1258 schema: OutputSchema {
1259 column_schemas: vec![
1260 ColumnSchema {
1261 name: "trace_id".to_string(),
1262 data_type: "String".to_string(),
1263 },
1264 ColumnSchema {
1265 name: "timestamp".to_string(),
1266 data_type: "TimestampNanosecond".to_string(),
1267 },
1268 ColumnSchema {
1269 name: "duration_nano".to_string(),
1270 data_type: "UInt64".to_string(),
1271 },
1272 ColumnSchema {
1273 name: "service_name".to_string(),
1274 data_type: "String".to_string(),
1275 },
1276 ColumnSchema {
1277 name: "span_name".to_string(),
1278 data_type: "String".to_string(),
1279 },
1280 ColumnSchema {
1281 name: "span_id".to_string(),
1282 data_type: "String".to_string(),
1283 },
1284 ColumnSchema {
1285 name: "span_attributes".to_string(),
1286 data_type: "Json".to_string(),
1287 },
1288 ],
1289 },
1290 rows: vec![
1291 vec![
1292 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1293 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1294 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1295 JsonValue::String("test-service-0".to_string()),
1296 JsonValue::String("access-mysql".to_string()),
1297 JsonValue::String("008421dbbd33a3e9".to_string()),
1298 JsonValue::Object(
1299 json!({
1300 "operation.type": "access-mysql",
1301 })
1302 .as_object()
1303 .unwrap()
1304 .clone(),
1305 ),
1306 ],
1307 vec![
1308 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1309 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1310 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1311 JsonValue::String("test-service-0".to_string()),
1312 JsonValue::String("access-redis".to_string()),
1313 JsonValue::String("ffa03416a7b9ea48".to_string()),
1314 JsonValue::Object(
1315 json!({
1316 "operation.type": "access-redis",
1317 })
1318 .as_object()
1319 .unwrap()
1320 .clone(),
1321 ),
1322 ],
1323 ],
1324 total_rows: 2,
1325 metrics: HashMap::new(),
1326 },
1327 vec![Trace {
1328 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1329 spans: vec![
1330 Span {
1331 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1332 span_id: "008421dbbd33a3e9".to_string(),
1333 operation_name: "access-mysql".to_string(),
1334 start_time: 1738726754492422,
1335 duration: 100000,
1336 tags: vec![KeyValue {
1337 key: "operation.type".to_string(),
1338 value_type: ValueType::String,
1339 value: Value::String("access-mysql".to_string()),
1340 }],
1341 process_id: "p1".to_string(),
1342 ..Default::default()
1343 },
1344 Span {
1345 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1346 span_id: "ffa03416a7b9ea48".to_string(),
1347 operation_name: "access-redis".to_string(),
1348 start_time: 1738726754642422,
1349 duration: 100000,
1350 tags: vec![KeyValue {
1351 key: "operation.type".to_string(),
1352 value_type: ValueType::String,
1353 value: Value::String("access-redis".to_string()),
1354 }],
1355 process_id: "p1".to_string(),
1356 ..Default::default()
1357 },
1358 ],
1359 processes: HashMap::from([(
1360 "p1".to_string(),
1361 Process {
1362 service_name: "test-service-0".to_string(),
1363 tags: vec![],
1364 },
1365 )]),
1366 ..Default::default()
1367 }],
1368 )];
1369
1370 for (records, expected) in tests {
1371 let traces = traces_from_records(records).unwrap();
1372 assert_eq!(traces, expected);
1373 }
1374 }
1375
1376 #[test]
1377 fn test_traces_from_v1_records() {
1378 let tests = vec![(
1380 HttpRecordsOutput {
1381 schema: OutputSchema {
1382 column_schemas: vec![
1383 ColumnSchema {
1384 name: "trace_id".to_string(),
1385 data_type: "String".to_string(),
1386 },
1387 ColumnSchema {
1388 name: "timestamp".to_string(),
1389 data_type: "TimestampNanosecond".to_string(),
1390 },
1391 ColumnSchema {
1392 name: "duration_nano".to_string(),
1393 data_type: "UInt64".to_string(),
1394 },
1395 ColumnSchema {
1396 name: "service_name".to_string(),
1397 data_type: "String".to_string(),
1398 },
1399 ColumnSchema {
1400 name: "span_name".to_string(),
1401 data_type: "String".to_string(),
1402 },
1403 ColumnSchema {
1404 name: "span_id".to_string(),
1405 data_type: "String".to_string(),
1406 },
1407 ColumnSchema {
1408 name: "span_attributes.http.request.method".to_string(),
1409 data_type: "String".to_string(),
1410 },
1411 ColumnSchema {
1412 name: "span_attributes.http.request.url".to_string(),
1413 data_type: "String".to_string(),
1414 },
1415 ColumnSchema {
1416 name: "span_attributes.http.status_code".to_string(),
1417 data_type: "UInt64".to_string(),
1418 },
1419 ],
1420 },
1421 rows: vec![
1422 vec![
1423 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1424 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1425 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1426 JsonValue::String("test-service-0".to_string()),
1427 JsonValue::String("access-mysql".to_string()),
1428 JsonValue::String("008421dbbd33a3e9".to_string()),
1429 JsonValue::String("GET".to_string()),
1430 JsonValue::String("/data".to_string()),
1431 JsonValue::Number(Number::from_u128(200).unwrap()),
1432 ],
1433 vec![
1434 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1435 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1436 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1437 JsonValue::String("test-service-0".to_string()),
1438 JsonValue::String("access-redis".to_string()),
1439 JsonValue::String("ffa03416a7b9ea48".to_string()),
1440 JsonValue::String("POST".to_string()),
1441 JsonValue::String("/create".to_string()),
1442 JsonValue::Number(Number::from_u128(400).unwrap()),
1443 ],
1444 ],
1445 total_rows: 2,
1446 metrics: HashMap::new(),
1447 },
1448 vec![Trace {
1449 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1450 spans: vec![
1451 Span {
1452 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1453 span_id: "008421dbbd33a3e9".to_string(),
1454 operation_name: "access-mysql".to_string(),
1455 start_time: 1738726754492422,
1456 duration: 100000,
1457 tags: vec![
1458 KeyValue {
1459 key: "http.request.method".to_string(),
1460 value_type: ValueType::String,
1461 value: Value::String("GET".to_string()),
1462 },
1463 KeyValue {
1464 key: "http.request.url".to_string(),
1465 value_type: ValueType::String,
1466 value: Value::String("/data".to_string()),
1467 },
1468 KeyValue {
1469 key: "http.status_code".to_string(),
1470 value_type: ValueType::Int64,
1471 value: Value::Int64(200),
1472 },
1473 ],
1474 process_id: "p1".to_string(),
1475 ..Default::default()
1476 },
1477 Span {
1478 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1479 span_id: "ffa03416a7b9ea48".to_string(),
1480 operation_name: "access-redis".to_string(),
1481 start_time: 1738726754642422,
1482 duration: 100000,
1483 tags: vec![
1484 KeyValue {
1485 key: "http.request.method".to_string(),
1486 value_type: ValueType::String,
1487 value: Value::String("POST".to_string()),
1488 },
1489 KeyValue {
1490 key: "http.request.url".to_string(),
1491 value_type: ValueType::String,
1492 value: Value::String("/create".to_string()),
1493 },
1494 KeyValue {
1495 key: "http.status_code".to_string(),
1496 value_type: ValueType::Int64,
1497 value: Value::Int64(400),
1498 },
1499 ],
1500 process_id: "p1".to_string(),
1501 ..Default::default()
1502 },
1503 ],
1504 processes: HashMap::from([(
1505 "p1".to_string(),
1506 Process {
1507 service_name: "test-service-0".to_string(),
1508 tags: vec![],
1509 },
1510 )]),
1511 ..Default::default()
1512 }],
1513 )];
1514
1515 for (records, expected) in tests {
1516 let traces = traces_from_records(records).unwrap();
1517 assert_eq!(traces, expected);
1518 }
1519 }
1520
1521 #[test]
1522 fn test_from_jaeger_query_params() {
1523 let tests = vec![
1525 (
1526 JaegerQueryParams {
1527 service_name: Some("test-service-0".to_string()),
1528 ..Default::default()
1529 },
1530 QueryTraceParams {
1531 service_name: "test-service-0".to_string(),
1532 ..Default::default()
1533 },
1534 ),
1535 (
1536 JaegerQueryParams {
1537 service_name: Some("test-service-0".to_string()),
1538 operation_name: Some("access-mysql".to_string()),
1539 start: Some(1738726754492422),
1540 end: Some(1738726754642422),
1541 max_duration: Some("100ms".to_string()),
1542 min_duration: Some("50ms".to_string()),
1543 limit: Some(10),
1544 tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1545 ..Default::default()
1546 },
1547 QueryTraceParams {
1548 service_name: "test-service-0".to_string(),
1549 operation_name: Some("access-mysql".to_string()),
1550 start_time: Some(1738726754492422000),
1551 end_time: Some(1738726754642422000),
1552 min_duration: Some(50000000),
1553 max_duration: Some(100000000),
1554 limit: Some(10),
1555 tags: Some(HashMap::from([
1556 ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1557 ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1558 ("error".to_string(), JsonValue::Bool(false)),
1559 ("http.method".to_string(), JsonValue::String("GET".to_string())),
1560 ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1561 ])),
1562 },
1563 ),
1564 ];
1565
1566 for (query_params, expected) in tests {
1567 let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1568 assert_eq!(query_params, expected);
1569 }
1570 }
1571
1572 #[test]
1573 fn test_check_schema() {
1574 let tests = vec![(
1576 HttpRecordsOutput {
1577 schema: OutputSchema {
1578 column_schemas: vec![
1579 ColumnSchema {
1580 name: "trace_id".to_string(),
1581 data_type: "String".to_string(),
1582 },
1583 ColumnSchema {
1584 name: "timestamp".to_string(),
1585 data_type: "TimestampNanosecond".to_string(),
1586 },
1587 ColumnSchema {
1588 name: "duration_nano".to_string(),
1589 data_type: "UInt64".to_string(),
1590 },
1591 ColumnSchema {
1592 name: "service_name".to_string(),
1593 data_type: "String".to_string(),
1594 },
1595 ColumnSchema {
1596 name: "span_name".to_string(),
1597 data_type: "String".to_string(),
1598 },
1599 ColumnSchema {
1600 name: "span_id".to_string(),
1601 data_type: "String".to_string(),
1602 },
1603 ColumnSchema {
1604 name: "span_attributes".to_string(),
1605 data_type: "Json".to_string(),
1606 },
1607 ],
1608 },
1609 rows: vec![],
1610 total_rows: 0,
1611 metrics: HashMap::new(),
1612 },
1613 vec![
1614 (TRACE_ID_COLUMN, "String"),
1615 (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1616 (DURATION_NANO_COLUMN, "UInt64"),
1617 (SERVICE_NAME_COLUMN, "String"),
1618 (SPAN_NAME_COLUMN, "String"),
1619 (SPAN_ID_COLUMN, "String"),
1620 (SPAN_ATTRIBUTES_COLUMN, "Json"),
1621 ],
1622 true,
1623 )];
1624
1625 for (records, expected_schema, is_ok) in tests {
1626 let result = check_schema(&records, &expected_schema);
1627 assert_eq!(result.is_ok(), is_ok);
1628 }
1629 }
1630
1631 #[test]
1632 fn test_normalize_span_kind() {
1633 let tests = vec![
1634 ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1635 ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1636 ];
1637
1638 for (input, expected) in tests {
1639 let result = normalize_span_kind(&input);
1640 assert_eq!(result, expected);
1641 }
1642 }
1643
1644 #[test]
1645 fn test_convert_string_to_number() {
1646 let tests = vec![
1647 (
1648 JsonValue::String("123".to_string()),
1649 Some(JsonValue::Number(Number::from(123))),
1650 ),
1651 (
1652 JsonValue::String("123.456".to_string()),
1653 Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1654 ),
1655 ];
1656
1657 for (input, expected) in tests {
1658 let result = convert_string_to_number(&input);
1659 assert_eq!(result, expected);
1660 }
1661 }
1662
1663 #[test]
1664 fn test_convert_string_to_boolean() {
1665 let tests = vec![
1666 (
1667 JsonValue::String("true".to_string()),
1668 Some(JsonValue::Bool(true)),
1669 ),
1670 (
1671 JsonValue::String("false".to_string()),
1672 Some(JsonValue::Bool(false)),
1673 ),
1674 ];
1675
1676 for (input, expected) in tests {
1677 let result = convert_string_to_boolean(&input);
1678 assert_eq!(result, expected);
1679 }
1680 }
1681}