1use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::extract::{Path, Query, State};
22use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
23use axum::response::IntoResponse;
24use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::util;
29use common_telemetry::{debug, error, tracing, warn};
30use serde::{Deserialize, Deserializer, Serialize, de};
31use serde_json::Value as JsonValue;
32use session::context::{Channel, QueryContext};
33use snafu::{OptionExt, ResultExt};
34
35use crate::error::{
36 CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result, status_code_to_http_status,
37};
38use crate::http::HttpRecordsOutput;
39use crate::http::extractor::TraceTableName;
40use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
41use crate::otlp::trace::{
42 DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
43 KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME,
44 KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN,
45 SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
46 SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
47 SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN,
48 TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
49};
50use crate::query_handler::JaegerQueryHandlerRef;
51
52pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
53
54const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
55const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
56
57#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
60pub struct JaegerAPIResponse {
61 pub data: Option<JaegerData>,
62 pub total: usize,
63 pub limit: usize,
64 pub offset: usize,
65 pub errors: Vec<JaegerAPIError>,
66}
67
68#[derive(Debug, Serialize, Deserialize, PartialEq)]
70#[serde(untagged)]
71pub enum JaegerData {
72 ServiceNames(Vec<String>),
73 OperationsNames(Vec<String>),
74 Operations(Vec<Operation>),
75 Traces(Vec<Trace>),
76}
77
78#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
80#[serde(rename_all = "camelCase")]
81pub struct JaegerAPIError {
82 pub code: i32,
83 pub msg: String,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub trace_id: Option<String>,
86}
87
88#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
90#[serde(rename_all = "camelCase")]
91pub struct Operation {
92 pub name: String,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub span_kind: Option<String>,
95}
96
97#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
99#[serde(rename_all = "camelCase")]
100pub struct Trace {
101 #[serde(rename = "traceID")]
102 pub trace_id: String,
103 pub spans: Vec<Span>,
104
105 #[serde(skip_serializing_if = "HashMap::is_empty")]
106 pub processes: HashMap<String, Process>,
107
108 #[serde(skip_serializing_if = "Vec::is_empty")]
109 pub warnings: Vec<String>,
110}
111
112#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct Span {
116 #[serde(rename = "traceID")]
117 pub trace_id: String,
118
119 #[serde(rename = "spanID")]
120 pub span_id: String,
121
122 #[serde(rename = "parentSpanID")]
123 #[serde(skip_serializing_if = "String::is_empty")]
124 pub parent_span_id: String,
125
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub flags: Option<u32>,
128
129 pub operation_name: String,
130 pub references: Vec<Reference>,
131 pub start_time: u64, pub duration: u64, pub tags: Vec<KeyValue>,
134 pub logs: Vec<Log>,
135
136 #[serde(rename = "processID")]
137 #[serde(skip_serializing_if = "String::is_empty")]
138 pub process_id: String,
139
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub process: Option<Process>,
142
143 #[serde(skip_serializing_if = "Vec::is_empty")]
144 pub warnings: Vec<String>,
145}
146
147#[derive(Debug, Serialize, Deserialize, PartialEq)]
149#[serde(rename_all = "camelCase")]
150pub struct Reference {
151 #[serde(rename = "traceID")]
152 pub trace_id: String,
153 #[serde(rename = "spanID")]
154 pub span_id: String,
155 pub ref_type: String,
156}
157
158#[derive(Debug, Serialize, Deserialize, PartialEq)]
160#[serde(rename_all = "camelCase")]
161pub struct Process {
162 pub service_name: String,
163 pub tags: Vec<KeyValue>,
164}
165
166#[derive(Debug, Serialize, Deserialize, PartialEq)]
168#[serde(rename_all = "camelCase")]
169pub struct Log {
170 pub timestamp: u64,
171 pub fields: Vec<KeyValue>,
172}
173
174#[derive(Debug, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub struct KeyValue {
178 pub key: String,
179 #[serde(rename = "type")]
180 pub value_type: ValueType,
181 pub value: Value,
182}
183
184#[derive(Debug, Serialize, Deserialize, PartialEq)]
186#[serde(untagged)]
187#[serde(rename_all = "camelCase")]
188pub enum Value {
189 String(String),
190 Int64(i64),
191 Float64(f64),
192 Boolean(bool),
193 Binary(Vec<u8>),
194}
195
196#[derive(Debug, Serialize, Deserialize, PartialEq)]
198#[serde(rename_all = "lowercase")]
199pub enum ValueType {
200 String,
201 Int64,
202 Float64,
203 Boolean,
204 Binary,
205}
206
207#[derive(Default, Debug, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct JaegerQueryParams {
211 #[serde(rename = "service")]
213 pub service_name: Option<String>,
214
215 #[serde(rename = "operation")]
217 pub operation_name: Option<String>,
218
219 #[serde(default, deserialize_with = "empty_string_as_none")]
221 pub limit: Option<usize>,
222
223 pub start: Option<i64>,
225
226 pub end: Option<i64>,
228
229 #[serde(default, deserialize_with = "empty_string_as_none")]
231 pub max_duration: Option<String>,
232
233 #[serde(default, deserialize_with = "empty_string_as_none")]
235 pub min_duration: Option<String>,
236
237 pub tags: Option<String>,
241
242 pub span_kind: Option<String>,
244}
245
246fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
248where
249 D: Deserializer<'de>,
250 T: FromStr,
251 T::Err: fmt::Display,
252{
253 let opt = Option::<String>::deserialize(de)?;
254 match opt.as_deref() {
255 None | Some("") => Ok(None),
256 Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
257 }
258}
259
260fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
261 query_ctx.set_channel(Channel::Jaeger);
263 if let Some(table) = table_name {
264 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
265 }
266}
267
268impl QueryTraceParams {
269 fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
270 let mut internal_query_params: QueryTraceParams = QueryTraceParams {
271 service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
272 reason: "service_name is required".to_string(),
273 })?,
274 operation_name: query_params.operation_name,
275 start_time: query_params.start.map(|start| start * 1000),
277 end_time: query_params.end.map(|end| end * 1000),
278 ..Default::default()
279 };
280
281 if let Some(max_duration) = query_params.max_duration {
282 let duration = humantime::parse_duration(&max_duration).map_err(|e| {
283 InvalidJaegerQuerySnafu {
284 reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
285 }
286 .build()
287 })?;
288 internal_query_params.max_duration = Some(duration.as_nanos() as u64);
289 }
290
291 if let Some(min_duration) = query_params.min_duration {
292 let duration = humantime::parse_duration(&min_duration).map_err(|e| {
293 InvalidJaegerQuerySnafu {
294 reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
295 }
296 .build()
297 })?;
298 internal_query_params.min_duration = Some(duration.as_nanos() as u64);
299 }
300
301 if let Some(tags) = query_params.tags {
302 let mut tags_map: HashMap<String, JsonValue> =
304 serde_json::from_str(&tags).map_err(|e| {
305 InvalidJaegerQuerySnafu {
306 reason: format!("parse tags '{}' failed: {}", tags, e),
307 }
308 .build()
309 })?;
310 for (_, v) in tags_map.iter_mut() {
311 if let Some(number) = convert_string_to_number(v) {
312 *v = number;
313 }
314 if let Some(boolean) = convert_string_to_boolean(v) {
315 *v = boolean;
316 }
317 }
318 internal_query_params.tags = Some(tags_map);
319 }
320
321 internal_query_params.limit = query_params.limit;
322
323 Ok(internal_query_params)
324 }
325}
326
327#[derive(Debug, Default, PartialEq)]
328pub struct QueryTraceParams {
329 pub service_name: String,
330 pub operation_name: Option<String>,
331
332 pub limit: Option<usize>,
334
335 pub tags: Option<HashMap<String, JsonValue>>,
337
338 pub start_time: Option<i64>,
340 pub end_time: Option<i64>,
341 pub min_duration: Option<u64>,
342 pub max_duration: Option<u64>,
343}
344
345#[axum_macros::debug_handler]
347#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
348pub async fn handle_get_services(
349 State(handler): State<JaegerQueryHandlerRef>,
350 Query(query_params): Query<JaegerQueryParams>,
351 Extension(mut query_ctx): Extension<QueryContext>,
352 TraceTableName(table_name): TraceTableName,
353) -> impl IntoResponse {
354 debug!(
355 "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
356 query_params, query_ctx
357 );
358
359 query_ctx.set_channel(Channel::Jaeger);
360 if let Some(table) = table_name {
361 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
362 }
363
364 let query_ctx = Arc::new(query_ctx);
365 let db = query_ctx.get_db_string();
366
367 let _timer = METRIC_JAEGER_QUERY_ELAPSED
369 .with_label_values(&[&db, "/api/services"])
370 .start_timer();
371
372 match handler.get_services(query_ctx).await {
373 Ok(output) => match covert_to_records(output).await {
374 Ok(Some(records)) => match services_from_records(records) {
375 Ok(services) => {
376 let services_num = services.len();
377 (
378 HttpStatusCode::OK,
379 axum::Json(JaegerAPIResponse {
380 data: Some(JaegerData::ServiceNames(services)),
381 total: services_num,
382 ..Default::default()
383 }),
384 )
385 }
386 Err(err) => {
387 error!("Failed to get services: {:?}", err);
388 error_response(err)
389 }
390 },
391 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
392 Err(err) => {
393 error!("Failed to get services: {:?}", err);
394 error_response(err)
395 }
396 },
397 Err(err) => handle_query_error(err, "Failed to get services", &db),
398 }
399}
400
401#[axum_macros::debug_handler]
403#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
404pub async fn handle_get_trace(
405 State(handler): State<JaegerQueryHandlerRef>,
406 Path(trace_id): Path<String>,
407 Query(query_params): Query<JaegerQueryParams>,
408 Extension(mut query_ctx): Extension<QueryContext>,
409 TraceTableName(table_name): TraceTableName,
410) -> impl IntoResponse {
411 debug!(
412 "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
413 trace_id, query_params, query_ctx
414 );
415
416 update_query_context(&mut query_ctx, table_name);
417 let query_ctx = Arc::new(query_ctx);
418 let db = query_ctx.get_db_string();
419
420 let _timer = METRIC_JAEGER_QUERY_ELAPSED
422 .with_label_values(&[&db, "/api/traces"])
423 .start_timer();
424
425 let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
427 let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
428
429 let output = match handler
430 .get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns)
431 .await
432 {
433 Ok(output) => output,
434 Err(err) => {
435 return handle_query_error(
436 err,
437 &format!("Failed to get trace for '{}'", trace_id),
438 &db,
439 );
440 }
441 };
442
443 match covert_to_records(output).await {
444 Ok(Some(records)) => match traces_from_records(records) {
445 Ok(traces) => (
446 HttpStatusCode::OK,
447 axum::Json(JaegerAPIResponse {
448 data: Some(JaegerData::Traces(traces)),
449 ..Default::default()
450 }),
451 ),
452 Err(err) => {
453 error!("Failed to get trace '{}': {:?}", trace_id, err);
454 error_response(err)
455 }
456 },
457 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
458 Err(err) => {
459 error!("Failed to get trace '{}': {:?}", trace_id, err);
460 error_response(err)
461 }
462 }
463}
464
465#[axum_macros::debug_handler]
467#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
468pub async fn handle_find_traces(
469 State(handler): State<JaegerQueryHandlerRef>,
470 Query(query_params): Query<JaegerQueryParams>,
471 Extension(mut query_ctx): Extension<QueryContext>,
472 TraceTableName(table_name): TraceTableName,
473) -> impl IntoResponse {
474 debug!(
475 "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
476 query_params, query_ctx
477 );
478
479 update_query_context(&mut query_ctx, table_name);
480 let query_ctx = Arc::new(query_ctx);
481 let db = query_ctx.get_db_string();
482
483 let _timer = METRIC_JAEGER_QUERY_ELAPSED
485 .with_label_values(&[&db, "/api/traces"])
486 .start_timer();
487
488 match QueryTraceParams::from_jaeger_query_params(query_params) {
489 Ok(query_params) => {
490 let output = handler.find_traces(query_ctx, query_params).await;
491 match output {
492 Ok(output) => match covert_to_records(output).await {
493 Ok(Some(records)) => match traces_from_records(records) {
494 Ok(traces) => (
495 HttpStatusCode::OK,
496 axum::Json(JaegerAPIResponse {
497 data: Some(JaegerData::Traces(traces)),
498 ..Default::default()
499 }),
500 ),
501 Err(err) => {
502 error!("Failed to find traces: {:?}", err);
503 error_response(err)
504 }
505 },
506 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
507 Err(err) => error_response(err),
508 },
509 Err(err) => handle_query_error(err, "Failed to find traces", &db),
510 }
511 }
512 Err(e) => error_response(e),
513 }
514}
515
516#[axum_macros::debug_handler]
518#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
519pub async fn handle_get_operations(
520 State(handler): State<JaegerQueryHandlerRef>,
521 Query(query_params): Query<JaegerQueryParams>,
522 Extension(mut query_ctx): Extension<QueryContext>,
523 TraceTableName(table_name): TraceTableName,
524 headers: HeaderMap,
525) -> impl IntoResponse {
526 debug!(
527 "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
528 query_params, query_ctx, headers
529 );
530
531 if let Some(service_name) = &query_params.service_name {
532 update_query_context(&mut query_ctx, table_name);
533 let query_ctx = Arc::new(query_ctx);
534 let db = query_ctx.get_db_string();
535
536 let _timer = METRIC_JAEGER_QUERY_ELAPSED
538 .with_label_values(&[&db, "/api/operations"])
539 .start_timer();
540
541 match handler
542 .get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
543 .await
544 {
545 Ok(output) => match covert_to_records(output).await {
546 Ok(Some(records)) => match operations_from_records(records, true) {
547 Ok(operations) => {
548 let total = operations.len();
549 (
550 HttpStatusCode::OK,
551 axum::Json(JaegerAPIResponse {
552 data: Some(JaegerData::Operations(operations)),
553 total,
554 ..Default::default()
555 }),
556 )
557 }
558 Err(err) => {
559 error!("Failed to get operations: {:?}", err);
560 error_response(err)
561 }
562 },
563 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
564 Err(err) => error_response(err),
565 },
566 Err(err) => handle_query_error(
567 err,
568 &format!("Failed to get operations for service '{}'", service_name),
569 &db,
570 ),
571 }
572 } else {
573 (
574 HttpStatusCode::BAD_REQUEST,
575 axum::Json(JaegerAPIResponse {
576 errors: vec![JaegerAPIError {
577 code: 400,
578 msg: "parameter 'service' is required".to_string(),
579 trace_id: None,
580 }],
581 ..Default::default()
582 }),
583 )
584 }
585}
586
587#[axum_macros::debug_handler]
589#[tracing::instrument(
590 skip_all,
591 fields(protocol = "jaeger", request_type = "get_operations_by_service")
592)]
593pub async fn handle_get_operations_by_service(
594 State(handler): State<JaegerQueryHandlerRef>,
595 Path(service_name): Path<String>,
596 Query(query_params): Query<JaegerQueryParams>,
597 Extension(mut query_ctx): Extension<QueryContext>,
598 TraceTableName(table_name): TraceTableName,
599 headers: HeaderMap,
600) -> impl IntoResponse {
601 debug!(
602 "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
603 service_name, query_params, query_ctx, headers
604 );
605
606 update_query_context(&mut query_ctx, table_name);
607 let query_ctx = Arc::new(query_ctx);
608 let db = query_ctx.get_db_string();
609
610 let _timer = METRIC_JAEGER_QUERY_ELAPSED
612 .with_label_values(&[&db, "/api/services"])
613 .start_timer();
614
615 match handler.get_operations(query_ctx, &service_name, None).await {
616 Ok(output) => match covert_to_records(output).await {
617 Ok(Some(records)) => match operations_from_records(records, false) {
618 Ok(operations) => {
619 let operations: Vec<String> =
620 operations.into_iter().map(|op| op.name).collect();
621 let total = operations.len();
622 (
623 HttpStatusCode::OK,
624 axum::Json(JaegerAPIResponse {
625 data: Some(JaegerData::OperationsNames(operations)),
626 total,
627 ..Default::default()
628 }),
629 )
630 }
631 Err(err) => {
632 error!(
633 "Failed to get operations for service '{}': {:?}",
634 service_name, err
635 );
636 error_response(err)
637 }
638 },
639 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
640 Err(err) => error_response(err),
641 },
642 Err(err) => handle_query_error(
643 err,
644 &format!("Failed to get operations for service '{}'", service_name),
645 &db,
646 ),
647 }
648}
649
650async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
651 match output.data {
652 OutputData::Stream(stream) => {
653 let records = HttpRecordsOutput::try_new(
654 stream.schema().clone(),
655 util::collect(stream)
656 .await
657 .context(CollectRecordbatchSnafu)?,
658 )?;
659 debug!(
660 "The query records: {}",
661 serde_json::to_string(&records).unwrap()
662 );
663 Ok(Some(records))
664 }
665 _ => Ok(None),
667 }
668}
669
670fn handle_query_error(
671 err: Error,
672 prompt: &str,
673 db: &str,
674) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
675 if err.status_code() == StatusCode::TableNotFound {
677 warn!(
678 "No trace table '{}' found in database '{}'",
679 TRACE_TABLE_NAME, db
680 );
681 (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
682 } else {
683 error!("{}: {:?}", prompt, err);
684 error_response(err)
685 }
686}
687
688fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
689 (
690 status_code_to_http_status(&err.status_code()),
691 axum::Json(JaegerAPIResponse {
692 errors: vec![JaegerAPIError {
693 code: err.status_code() as i32,
694 msg: err.to_string(),
695 ..Default::default()
696 }],
697 ..Default::default()
698 }),
699 )
700}
701
702fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
703 let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
705 let mut trace_id_to_spans: BTreeMap<String, Vec<Span>> = BTreeMap::new();
708 let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
710
711 let is_span_attributes_flatten = !records
712 .schema
713 .column_schemas
714 .iter()
715 .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
716
717 for row in records.rows.into_iter() {
718 let mut span = Span::default();
719 let mut service_name = None;
720 let mut resource_tags = vec![];
721
722 for (idx, cell) in row.into_iter().enumerate() {
723 let column_name = &records.schema.column_schemas[idx].name;
725
726 match column_name.as_str() {
727 TRACE_ID_COLUMN => {
728 if let JsonValue::String(trace_id) = cell {
729 span.trace_id = trace_id.clone();
730 trace_id_to_processes.entry(trace_id).or_default();
731 }
732 }
733 TIMESTAMP_COLUMN => {
734 span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
735 reason: "Failed to convert timestamp to u64".to_string(),
736 })? / 1000;
737 }
738 DURATION_NANO_COLUMN => {
739 span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
740 reason: "Failed to convert duration to u64".to_string(),
741 })? / 1000;
742 }
743 SERVICE_NAME_COLUMN => {
744 if let JsonValue::String(name) = cell {
745 service_name = Some(name);
746 }
747 }
748 SPAN_NAME_COLUMN => {
749 if let JsonValue::String(span_name) = cell {
750 span.operation_name = span_name;
751 }
752 }
753 SPAN_ID_COLUMN => {
754 if let JsonValue::String(span_id) = cell {
755 span.span_id = span_id;
756 }
757 }
758 SPAN_ATTRIBUTES_COLUMN => {
759 if let JsonValue::Object(span_attrs) = cell {
762 span.tags.extend(object_to_tags(span_attrs));
763 }
764 }
765 RESOURCE_ATTRIBUTES_COLUMN => {
766 if let JsonValue::Object(mut resource_attrs) = cell {
770 resource_attrs.remove(KEY_SERVICE_NAME);
771 resource_tags = object_to_tags(resource_attrs);
772 }
773 }
774 PARENT_SPAN_ID_COLUMN => {
775 if let JsonValue::String(parent_span_id) = cell
776 && !parent_span_id.is_empty()
777 {
778 span.references.push(Reference {
779 trace_id: span.trace_id.clone(),
780 span_id: parent_span_id,
781 ref_type: REF_TYPE_CHILD_OF.to_string(),
782 });
783 }
784 }
785 SPAN_EVENTS_COLUMN => {
786 if let JsonValue::Array(events) = cell {
787 for event in events {
788 if let JsonValue::Object(mut obj) = event {
789 let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
790 continue;
791 };
792
793 let Some(t) =
794 obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
795 SPAN_KIND_TIME_FMTS
796 .iter()
797 .find_map(|fmt| {
798 chrono::DateTime::parse_from_str(s, fmt).ok()
799 })
800 .map(|dt| dt.timestamp_micros() as u64)
801 })
802 else {
803 continue;
804 };
805
806 let mut fields = vec![KeyValue {
807 key: "event".to_string(),
808 value_type: ValueType::String,
809 value: Value::String(action.to_string()),
810 }];
811
812 if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
814 fields.extend(object_to_tags(attrs));
815 }
816
817 span.logs.push(Log {
818 timestamp: t,
819 fields,
820 });
821 }
822 }
823 }
824 }
825 SCOPE_NAME_COLUMN => {
826 if let JsonValue::String(scope_name) = cell
827 && !scope_name.is_empty()
828 {
829 span.tags.push(KeyValue {
830 key: KEY_OTEL_SCOPE_NAME.to_string(),
831 value_type: ValueType::String,
832 value: Value::String(scope_name),
833 });
834 }
835 }
836 SCOPE_VERSION_COLUMN => {
837 if let JsonValue::String(scope_version) = cell
838 && !scope_version.is_empty()
839 {
840 span.tags.push(KeyValue {
841 key: KEY_OTEL_SCOPE_VERSION.to_string(),
842 value_type: ValueType::String,
843 value: Value::String(scope_version),
844 });
845 }
846 }
847 SPAN_KIND_COLUMN => {
848 if let JsonValue::String(span_kind) = cell
849 && !span_kind.is_empty()
850 {
851 span.tags.push(KeyValue {
852 key: KEY_SPAN_KIND.to_string(),
853 value_type: ValueType::String,
854 value: Value::String(normalize_span_kind(&span_kind)),
855 });
856 }
857 }
858 SPAN_STATUS_CODE => {
859 if let JsonValue::String(span_status) = cell
860 && span_status != SPAN_STATUS_UNSET
861 && !span_status.is_empty()
862 {
863 span.tags.push(KeyValue {
864 key: KEY_OTEL_STATUS_CODE.to_string(),
865 value_type: ValueType::String,
866 value: Value::String(normalize_status_code(&span_status)),
867 });
868 if span_status == SPAN_STATUS_ERROR {
870 span.tags.push(KeyValue {
871 key: KEY_OTEL_STATUS_ERROR_KEY.to_string(),
872 value_type: ValueType::Boolean,
873 value: Value::Boolean(true),
874 });
875 }
876 }
877 }
878
879 SPAN_STATUS_MESSAGE_COLUMN => {
880 if let JsonValue::String(span_status_message) = cell
881 && !span_status_message.is_empty()
882 {
883 span.tags.push(KeyValue {
884 key: KEY_OTEL_STATUS_MESSAGE.to_string(),
885 value_type: ValueType::String,
886 value: Value::String(span_status_message),
887 });
888 }
889 }
890
891 TRACE_STATE_COLUMN => {
892 if let JsonValue::String(trace_state) = cell
893 && !trace_state.is_empty()
894 {
895 span.tags.push(KeyValue {
896 key: KEY_OTEL_TRACE_STATE.to_string(),
897 value_type: ValueType::String,
898 value: Value::String(trace_state),
899 });
900 }
901 }
902
903 _ => {
904 if is_span_attributes_flatten {
906 const SPAN_ATTR_PREFIX: &str = "span_attributes.";
907 const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
908 if column_name.starts_with(SPAN_ATTR_PREFIX) {
910 if let Some(keyvalue) = to_keyvalue(
911 column_name
912 .strip_prefix(SPAN_ATTR_PREFIX)
913 .unwrap_or_default()
914 .to_string(),
915 cell,
916 ) {
917 span.tags.push(keyvalue);
918 }
919 } else if column_name.starts_with(RESOURCE_ATTR_PREFIX)
920 && let Some(keyvalue) = to_keyvalue(
921 column_name
922 .strip_prefix(RESOURCE_ATTR_PREFIX)
923 .unwrap_or_default()
924 .to_string(),
925 cell,
926 )
927 {
928 resource_tags.push(keyvalue);
929 }
930 }
931 }
932 }
933 }
934
935 if let Some(service_name) = service_name {
936 if !service_to_resource_attributes.contains_key(&service_name) {
937 service_to_resource_attributes.insert(service_name.clone(), resource_tags);
938 }
939
940 if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
941 if let Some(process_id) = process.get(&service_name) {
942 span.process_id = process_id.clone();
943 } else {
944 let process_id = format!("p{}", process.len() + 1);
946 process.insert(service_name, process_id.clone());
947 span.process_id = process_id;
948 }
949 }
950 }
951
952 span.tags.sort_by(|a, b| a.key.cmp(&b.key));
954
955 if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
956 spans.push(span);
957 } else {
958 trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
959 }
960 }
961
962 let mut traces = Vec::new();
963 for (trace_id, spans) in trace_id_to_spans {
964 let mut trace = Trace {
965 trace_id,
966 spans,
967 ..Default::default()
968 };
969
970 if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
971 let mut process_id_to_process = HashMap::new();
972 for (service_name, process_id) in processes.into_iter() {
973 let tags = service_to_resource_attributes
974 .remove(&service_name)
975 .unwrap_or_default();
976 process_id_to_process.insert(process_id, Process { service_name, tags });
977 }
978 trace.processes = process_id_to_process;
979 }
980 traces.push(trace);
981 }
982
983 Ok(traces)
984}
985
986fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
987 match value {
988 JsonValue::String(value) => Some(KeyValue {
989 key,
990 value_type: ValueType::String,
991 value: Value::String(value.clone()),
992 }),
993 JsonValue::Number(value) => Some(KeyValue {
994 key,
995 value_type: ValueType::Int64,
996 value: Value::Int64(value.as_i64().unwrap_or(0)),
997 }),
998 JsonValue::Bool(value) => Some(KeyValue {
999 key,
1000 value_type: ValueType::Boolean,
1001 value: Value::Boolean(value),
1002 }),
1003 JsonValue::Array(value) => Some(KeyValue {
1004 key,
1005 value_type: ValueType::String,
1006 value: Value::String(serde_json::to_string(&value).unwrap()),
1007 }),
1008 JsonValue::Object(value) => Some(KeyValue {
1009 key,
1010 value_type: ValueType::String,
1011 value: Value::String(serde_json::to_string(&value).unwrap()),
1012 }),
1013 JsonValue::Null => None,
1014 }
1015}
1016
1017fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1018 object
1019 .into_iter()
1020 .filter_map(|(key, value)| to_keyvalue(key, value))
1021 .collect()
1022}
1023
1024fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1025 let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1026 check_schema(&records, &expected_schema)?;
1027
1028 let mut services = Vec::with_capacity(records.total_rows);
1029 for row in records.rows.into_iter() {
1030 for value in row.into_iter() {
1031 if let JsonValue::String(service_name) = value {
1032 services.push(service_name);
1033 }
1034 }
1035 }
1036 Ok(services)
1037}
1038
1039fn operations_from_records(
1041 records: HttpRecordsOutput,
1042 contain_span_kind: bool,
1043) -> Result<Vec<Operation>> {
1044 let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1045 check_schema(&records, &expected_schema)?;
1046
1047 let mut operations = Vec::with_capacity(records.total_rows);
1048 for row in records.rows.into_iter() {
1049 let mut row_iter = row.into_iter();
1050 if let Some(JsonValue::String(operation)) = row_iter.next() {
1051 let mut operation = Operation {
1052 name: operation,
1053 span_kind: None,
1054 };
1055 if contain_span_kind {
1056 if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1057 operation.span_kind = Some(normalize_span_kind(&span_kind));
1058 }
1059 } else {
1060 row_iter.next();
1062 }
1063 operations.push(operation);
1064 }
1065 }
1066
1067 Ok(operations)
1068}
1069
1070fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1072 for (i, column) in records.schema.column_schemas.iter().enumerate() {
1073 if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1074 InvalidJaegerQuerySnafu {
1075 reason: "query result schema is not correct".to_string(),
1076 }
1077 .fail()?
1078 }
1079 }
1080 Ok(())
1081}
1082
1083fn normalize_span_kind(span_kind: &str) -> String {
1086 if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1088 stripped.to_lowercase()
1089 } else {
1090 span_kind.to_lowercase()
1092 }
1093}
1094
1095fn normalize_status_code(status_code: &str) -> String {
1098 if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1100 stripped.to_string()
1101 } else {
1102 status_code.to_string()
1104 }
1105}
1106
1107fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1108 if let Some(data) = input.as_str() {
1109 if let Ok(number) = data.parse::<i64>() {
1110 return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1111 }
1112 if let Ok(number) = data.parse::<f64>()
1113 && let Some(number) = serde_json::Number::from_f64(number)
1114 {
1115 return Some(serde_json::Value::Number(number));
1116 }
1117 }
1118
1119 None
1120}
1121
1122fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1123 if let Some(data) = input.as_str() {
1124 if data == "true" {
1125 return Some(serde_json::Value::Bool(true));
1126 }
1127 if data == "false" {
1128 return Some(serde_json::Value::Bool(false));
1129 }
1130 }
1131
1132 None
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use serde_json::{Number, Value as JsonValue, json};
1138
1139 use super::*;
1140 use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1141
1142 #[test]
1143 fn test_services_from_records() {
1144 let tests = vec![(
1146 HttpRecordsOutput {
1147 schema: OutputSchema {
1148 column_schemas: vec![ColumnSchema {
1149 name: "service_name".to_string(),
1150 data_type: "String".to_string(),
1151 }],
1152 },
1153 rows: vec![
1154 vec![JsonValue::String("test-service-0".to_string())],
1155 vec![JsonValue::String("test-service-1".to_string())],
1156 ],
1157 total_rows: 2,
1158 metrics: HashMap::new(),
1159 },
1160 vec!["test-service-0".to_string(), "test-service-1".to_string()],
1161 )];
1162
1163 for (records, expected) in tests {
1164 let services = services_from_records(records).unwrap();
1165 assert_eq!(services, expected);
1166 }
1167 }
1168
1169 #[test]
1170 fn test_operations_from_records() {
1171 let tests = vec![
1173 (
1174 HttpRecordsOutput {
1175 schema: OutputSchema {
1176 column_schemas: vec![
1177 ColumnSchema {
1178 name: "span_name".to_string(),
1179 data_type: "String".to_string(),
1180 },
1181 ColumnSchema {
1182 name: "span_kind".to_string(),
1183 data_type: "String".to_string(),
1184 },
1185 ],
1186 },
1187 rows: vec![
1188 vec![
1189 JsonValue::String("access-mysql".to_string()),
1190 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1191 ],
1192 vec![
1193 JsonValue::String("access-redis".to_string()),
1194 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1195 ],
1196 ],
1197 total_rows: 2,
1198 metrics: HashMap::new(),
1199 },
1200 false,
1201 vec![
1202 Operation {
1203 name: "access-mysql".to_string(),
1204 span_kind: None,
1205 },
1206 Operation {
1207 name: "access-redis".to_string(),
1208 span_kind: None,
1209 },
1210 ],
1211 ),
1212 (
1213 HttpRecordsOutput {
1214 schema: OutputSchema {
1215 column_schemas: vec![
1216 ColumnSchema {
1217 name: "span_name".to_string(),
1218 data_type: "String".to_string(),
1219 },
1220 ColumnSchema {
1221 name: "span_kind".to_string(),
1222 data_type: "String".to_string(),
1223 },
1224 ],
1225 },
1226 rows: vec![
1227 vec![
1228 JsonValue::String("access-mysql".to_string()),
1229 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1230 ],
1231 vec![
1232 JsonValue::String("access-redis".to_string()),
1233 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1234 ],
1235 ],
1236 total_rows: 2,
1237 metrics: HashMap::new(),
1238 },
1239 true,
1240 vec![
1241 Operation {
1242 name: "access-mysql".to_string(),
1243 span_kind: Some("server".to_string()),
1244 },
1245 Operation {
1246 name: "access-redis".to_string(),
1247 span_kind: Some("client".to_string()),
1248 },
1249 ],
1250 ),
1251 ];
1252
1253 for (records, contain_span_kind, expected) in tests {
1254 let operations = operations_from_records(records, contain_span_kind).unwrap();
1255 assert_eq!(operations, expected);
1256 }
1257 }
1258
1259 #[test]
1260 fn test_traces_from_records() {
1261 let tests = vec![(
1263 HttpRecordsOutput {
1264 schema: OutputSchema {
1265 column_schemas: vec![
1266 ColumnSchema {
1267 name: "trace_id".to_string(),
1268 data_type: "String".to_string(),
1269 },
1270 ColumnSchema {
1271 name: "timestamp".to_string(),
1272 data_type: "TimestampNanosecond".to_string(),
1273 },
1274 ColumnSchema {
1275 name: "duration_nano".to_string(),
1276 data_type: "UInt64".to_string(),
1277 },
1278 ColumnSchema {
1279 name: "service_name".to_string(),
1280 data_type: "String".to_string(),
1281 },
1282 ColumnSchema {
1283 name: "span_name".to_string(),
1284 data_type: "String".to_string(),
1285 },
1286 ColumnSchema {
1287 name: "span_id".to_string(),
1288 data_type: "String".to_string(),
1289 },
1290 ColumnSchema {
1291 name: "span_attributes".to_string(),
1292 data_type: "Json".to_string(),
1293 },
1294 ],
1295 },
1296 rows: vec![
1297 vec![
1298 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1299 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1300 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1301 JsonValue::String("test-service-0".to_string()),
1302 JsonValue::String("access-mysql".to_string()),
1303 JsonValue::String("008421dbbd33a3e9".to_string()),
1304 JsonValue::Object(
1305 json!({
1306 "operation.type": "access-mysql",
1307 })
1308 .as_object()
1309 .unwrap()
1310 .clone(),
1311 ),
1312 ],
1313 vec![
1314 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1315 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1316 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1317 JsonValue::String("test-service-0".to_string()),
1318 JsonValue::String("access-redis".to_string()),
1319 JsonValue::String("ffa03416a7b9ea48".to_string()),
1320 JsonValue::Object(
1321 json!({
1322 "operation.type": "access-redis",
1323 })
1324 .as_object()
1325 .unwrap()
1326 .clone(),
1327 ),
1328 ],
1329 ],
1330 total_rows: 2,
1331 metrics: HashMap::new(),
1332 },
1333 vec![Trace {
1334 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1335 spans: vec![
1336 Span {
1337 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1338 span_id: "008421dbbd33a3e9".to_string(),
1339 operation_name: "access-mysql".to_string(),
1340 start_time: 1738726754492422,
1341 duration: 100000,
1342 tags: vec![KeyValue {
1343 key: "operation.type".to_string(),
1344 value_type: ValueType::String,
1345 value: Value::String("access-mysql".to_string()),
1346 }],
1347 process_id: "p1".to_string(),
1348 ..Default::default()
1349 },
1350 Span {
1351 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1352 span_id: "ffa03416a7b9ea48".to_string(),
1353 operation_name: "access-redis".to_string(),
1354 start_time: 1738726754642422,
1355 duration: 100000,
1356 tags: vec![KeyValue {
1357 key: "operation.type".to_string(),
1358 value_type: ValueType::String,
1359 value: Value::String("access-redis".to_string()),
1360 }],
1361 process_id: "p1".to_string(),
1362 ..Default::default()
1363 },
1364 ],
1365 processes: HashMap::from([(
1366 "p1".to_string(),
1367 Process {
1368 service_name: "test-service-0".to_string(),
1369 tags: vec![],
1370 },
1371 )]),
1372 ..Default::default()
1373 }],
1374 )];
1375
1376 for (records, expected) in tests {
1377 let traces = traces_from_records(records).unwrap();
1378 assert_eq!(traces, expected);
1379 }
1380 }
1381
1382 #[test]
1383 fn test_traces_from_v1_records() {
1384 let tests = vec![(
1386 HttpRecordsOutput {
1387 schema: OutputSchema {
1388 column_schemas: vec![
1389 ColumnSchema {
1390 name: "trace_id".to_string(),
1391 data_type: "String".to_string(),
1392 },
1393 ColumnSchema {
1394 name: "timestamp".to_string(),
1395 data_type: "TimestampNanosecond".to_string(),
1396 },
1397 ColumnSchema {
1398 name: "duration_nano".to_string(),
1399 data_type: "UInt64".to_string(),
1400 },
1401 ColumnSchema {
1402 name: "service_name".to_string(),
1403 data_type: "String".to_string(),
1404 },
1405 ColumnSchema {
1406 name: "span_name".to_string(),
1407 data_type: "String".to_string(),
1408 },
1409 ColumnSchema {
1410 name: "span_id".to_string(),
1411 data_type: "String".to_string(),
1412 },
1413 ColumnSchema {
1414 name: "span_attributes.http.request.method".to_string(),
1415 data_type: "String".to_string(),
1416 },
1417 ColumnSchema {
1418 name: "span_attributes.http.request.url".to_string(),
1419 data_type: "String".to_string(),
1420 },
1421 ColumnSchema {
1422 name: "span_attributes.http.status_code".to_string(),
1423 data_type: "UInt64".to_string(),
1424 },
1425 ],
1426 },
1427 rows: vec![
1428 vec![
1429 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1430 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1431 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1432 JsonValue::String("test-service-0".to_string()),
1433 JsonValue::String("access-mysql".to_string()),
1434 JsonValue::String("008421dbbd33a3e9".to_string()),
1435 JsonValue::String("GET".to_string()),
1436 JsonValue::String("/data".to_string()),
1437 JsonValue::Number(Number::from_u128(200).unwrap()),
1438 ],
1439 vec![
1440 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1441 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1442 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1443 JsonValue::String("test-service-0".to_string()),
1444 JsonValue::String("access-redis".to_string()),
1445 JsonValue::String("ffa03416a7b9ea48".to_string()),
1446 JsonValue::String("POST".to_string()),
1447 JsonValue::String("/create".to_string()),
1448 JsonValue::Number(Number::from_u128(400).unwrap()),
1449 ],
1450 ],
1451 total_rows: 2,
1452 metrics: HashMap::new(),
1453 },
1454 vec![Trace {
1455 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1456 spans: vec![
1457 Span {
1458 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1459 span_id: "008421dbbd33a3e9".to_string(),
1460 operation_name: "access-mysql".to_string(),
1461 start_time: 1738726754492422,
1462 duration: 100000,
1463 tags: vec![
1464 KeyValue {
1465 key: "http.request.method".to_string(),
1466 value_type: ValueType::String,
1467 value: Value::String("GET".to_string()),
1468 },
1469 KeyValue {
1470 key: "http.request.url".to_string(),
1471 value_type: ValueType::String,
1472 value: Value::String("/data".to_string()),
1473 },
1474 KeyValue {
1475 key: "http.status_code".to_string(),
1476 value_type: ValueType::Int64,
1477 value: Value::Int64(200),
1478 },
1479 ],
1480 process_id: "p1".to_string(),
1481 ..Default::default()
1482 },
1483 Span {
1484 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1485 span_id: "ffa03416a7b9ea48".to_string(),
1486 operation_name: "access-redis".to_string(),
1487 start_time: 1738726754642422,
1488 duration: 100000,
1489 tags: vec![
1490 KeyValue {
1491 key: "http.request.method".to_string(),
1492 value_type: ValueType::String,
1493 value: Value::String("POST".to_string()),
1494 },
1495 KeyValue {
1496 key: "http.request.url".to_string(),
1497 value_type: ValueType::String,
1498 value: Value::String("/create".to_string()),
1499 },
1500 KeyValue {
1501 key: "http.status_code".to_string(),
1502 value_type: ValueType::Int64,
1503 value: Value::Int64(400),
1504 },
1505 ],
1506 process_id: "p1".to_string(),
1507 ..Default::default()
1508 },
1509 ],
1510 processes: HashMap::from([(
1511 "p1".to_string(),
1512 Process {
1513 service_name: "test-service-0".to_string(),
1514 tags: vec![],
1515 },
1516 )]),
1517 ..Default::default()
1518 }],
1519 )];
1520
1521 for (records, expected) in tests {
1522 let traces = traces_from_records(records).unwrap();
1523 assert_eq!(traces, expected);
1524 }
1525 }
1526
1527 #[test]
1528 fn test_from_jaeger_query_params() {
1529 let tests = vec![
1531 (
1532 JaegerQueryParams {
1533 service_name: Some("test-service-0".to_string()),
1534 ..Default::default()
1535 },
1536 QueryTraceParams {
1537 service_name: "test-service-0".to_string(),
1538 ..Default::default()
1539 },
1540 ),
1541 (
1542 JaegerQueryParams {
1543 service_name: Some("test-service-0".to_string()),
1544 operation_name: Some("access-mysql".to_string()),
1545 start: Some(1738726754492422),
1546 end: Some(1738726754642422),
1547 max_duration: Some("100ms".to_string()),
1548 min_duration: Some("50ms".to_string()),
1549 limit: Some(10),
1550 tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1551 ..Default::default()
1552 },
1553 QueryTraceParams {
1554 service_name: "test-service-0".to_string(),
1555 operation_name: Some("access-mysql".to_string()),
1556 start_time: Some(1738726754492422000),
1557 end_time: Some(1738726754642422000),
1558 min_duration: Some(50000000),
1559 max_duration: Some(100000000),
1560 limit: Some(10),
1561 tags: Some(HashMap::from([
1562 ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1563 ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1564 ("error".to_string(), JsonValue::Bool(false)),
1565 ("http.method".to_string(), JsonValue::String("GET".to_string())),
1566 ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1567 ])),
1568 },
1569 ),
1570 ];
1571
1572 for (query_params, expected) in tests {
1573 let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1574 assert_eq!(query_params, expected);
1575 }
1576 }
1577
1578 #[test]
1579 fn test_check_schema() {
1580 let tests = vec![(
1582 HttpRecordsOutput {
1583 schema: OutputSchema {
1584 column_schemas: vec![
1585 ColumnSchema {
1586 name: "trace_id".to_string(),
1587 data_type: "String".to_string(),
1588 },
1589 ColumnSchema {
1590 name: "timestamp".to_string(),
1591 data_type: "TimestampNanosecond".to_string(),
1592 },
1593 ColumnSchema {
1594 name: "duration_nano".to_string(),
1595 data_type: "UInt64".to_string(),
1596 },
1597 ColumnSchema {
1598 name: "service_name".to_string(),
1599 data_type: "String".to_string(),
1600 },
1601 ColumnSchema {
1602 name: "span_name".to_string(),
1603 data_type: "String".to_string(),
1604 },
1605 ColumnSchema {
1606 name: "span_id".to_string(),
1607 data_type: "String".to_string(),
1608 },
1609 ColumnSchema {
1610 name: "span_attributes".to_string(),
1611 data_type: "Json".to_string(),
1612 },
1613 ],
1614 },
1615 rows: vec![],
1616 total_rows: 0,
1617 metrics: HashMap::new(),
1618 },
1619 vec![
1620 (TRACE_ID_COLUMN, "String"),
1621 (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1622 (DURATION_NANO_COLUMN, "UInt64"),
1623 (SERVICE_NAME_COLUMN, "String"),
1624 (SPAN_NAME_COLUMN, "String"),
1625 (SPAN_ID_COLUMN, "String"),
1626 (SPAN_ATTRIBUTES_COLUMN, "Json"),
1627 ],
1628 true,
1629 )];
1630
1631 for (records, expected_schema, is_ok) in tests {
1632 let result = check_schema(&records, &expected_schema);
1633 assert_eq!(result.is_ok(), is_ok);
1634 }
1635 }
1636
1637 #[test]
1638 fn test_normalize_span_kind() {
1639 let tests = vec![
1640 ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1641 ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1642 ];
1643
1644 for (input, expected) in tests {
1645 let result = normalize_span_kind(&input);
1646 assert_eq!(result, expected);
1647 }
1648 }
1649
1650 #[test]
1651 fn test_convert_string_to_number() {
1652 let tests = vec![
1653 (
1654 JsonValue::String("123".to_string()),
1655 Some(JsonValue::Number(Number::from(123))),
1656 ),
1657 (
1658 JsonValue::String("123.456".to_string()),
1659 Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1660 ),
1661 ];
1662
1663 for (input, expected) in tests {
1664 let result = convert_string_to_number(&input);
1665 assert_eq!(result, expected);
1666 }
1667 }
1668
1669 #[test]
1670 fn test_convert_string_to_boolean() {
1671 let tests = vec![
1672 (
1673 JsonValue::String("true".to_string()),
1674 Some(JsonValue::Bool(true)),
1675 ),
1676 (
1677 JsonValue::String("false".to_string()),
1678 Some(JsonValue::Bool(false)),
1679 ),
1680 ];
1681
1682 for (input, expected) in tests {
1683 let result = convert_string_to_boolean(&input);
1684 assert_eq!(result, expected);
1685 }
1686 }
1687}