1use std::collections::{BTreeMap, HashMap};
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::extract::{Path, Query, State};
22use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
23use axum::response::IntoResponse;
24use axum_extra::TypedHeader;
25use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::util;
30use common_telemetry::{debug, error, tracing, warn};
31use headers::UserAgent;
32use serde::{Deserialize, Deserializer, Serialize, de};
33use serde_json::Value as JsonValue;
34use session::context::{Channel, QueryContext};
35use snafu::{OptionExt, ResultExt};
36
37use crate::error::{
38 CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result, status_code_to_http_status,
39};
40use crate::http::HttpRecordsOutput;
41use crate::http::extractor::TraceTableName;
42use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
43use crate::otlp::trace::{
44 DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
45 KEY_OTEL_STATUS_ERROR_KEY, KEY_OTEL_STATUS_MESSAGE, KEY_OTEL_TRACE_STATE, KEY_SERVICE_NAME,
46 KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN, SCOPE_VERSION_COLUMN,
47 SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
48 SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
49 SPAN_STATUS_MESSAGE_COLUMN, SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN,
50 TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
51};
52use crate::query_handler::JaegerQueryHandlerRef;
53
54pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
55
56const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
57const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
58
59const TRACE_NOT_FOUND_ERROR_CODE: i32 = 404;
60const TRACE_NOT_FOUND_ERROR_MSG: &str = "trace not found";
61
62#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
65pub struct JaegerAPIResponse {
66 pub data: Option<JaegerData>,
67 pub total: usize,
68 pub limit: usize,
69 pub offset: usize,
70 pub errors: Vec<JaegerAPIError>,
71}
72
73impl JaegerAPIResponse {
74 pub fn trace_not_found() -> Self {
75 Self {
76 data: None,
77 total: 0,
78 limit: 0,
79 offset: 0,
80 errors: vec![JaegerAPIError {
81 code: TRACE_NOT_FOUND_ERROR_CODE,
82 msg: TRACE_NOT_FOUND_ERROR_MSG.to_string(),
83 trace_id: None,
84 }],
85 }
86 }
87}
88
89#[derive(Debug, Serialize, Deserialize, PartialEq)]
91#[serde(untagged)]
92pub enum JaegerData {
93 ServiceNames(Vec<String>),
94 OperationsNames(Vec<String>),
95 Operations(Vec<Operation>),
96 Traces(Vec<Trace>),
97}
98
99#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
101#[serde(rename_all = "camelCase")]
102pub struct JaegerAPIError {
103 pub code: i32,
104 pub msg: String,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub trace_id: Option<String>,
107}
108
109#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
111#[serde(rename_all = "camelCase")]
112pub struct Operation {
113 pub name: String,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub span_kind: Option<String>,
116}
117
118#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
120#[serde(rename_all = "camelCase")]
121pub struct Trace {
122 #[serde(rename = "traceID")]
123 pub trace_id: String,
124 pub spans: Vec<Span>,
125
126 #[serde(skip_serializing_if = "HashMap::is_empty")]
127 pub processes: HashMap<String, Process>,
128
129 #[serde(skip_serializing_if = "Vec::is_empty")]
130 pub warnings: Vec<String>,
131}
132
133#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
135#[serde(rename_all = "camelCase")]
136pub struct Span {
137 #[serde(rename = "traceID")]
138 pub trace_id: String,
139
140 #[serde(rename = "spanID")]
141 pub span_id: String,
142
143 #[serde(rename = "parentSpanID")]
144 #[serde(skip_serializing_if = "String::is_empty")]
145 pub parent_span_id: String,
146
147 #[serde(skip_serializing_if = "Option::is_none")]
148 pub flags: Option<u32>,
149
150 pub operation_name: String,
151 pub references: Vec<Reference>,
152 pub start_time: u64, pub duration: u64, pub tags: Vec<KeyValue>,
155 pub logs: Vec<Log>,
156
157 #[serde(rename = "processID")]
158 #[serde(skip_serializing_if = "String::is_empty")]
159 pub process_id: String,
160
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub process: Option<Process>,
163
164 #[serde(skip_serializing_if = "Vec::is_empty")]
165 pub warnings: Vec<String>,
166}
167
168#[derive(Debug, Serialize, Deserialize, PartialEq)]
170#[serde(rename_all = "camelCase")]
171pub struct Reference {
172 #[serde(rename = "traceID")]
173 pub trace_id: String,
174 #[serde(rename = "spanID")]
175 pub span_id: String,
176 pub ref_type: String,
177}
178
179#[derive(Debug, Serialize, Deserialize, PartialEq)]
181#[serde(rename_all = "camelCase")]
182pub struct Process {
183 pub service_name: String,
184 pub tags: Vec<KeyValue>,
185}
186
187#[derive(Debug, Serialize, Deserialize, PartialEq)]
189#[serde(rename_all = "camelCase")]
190pub struct Log {
191 pub timestamp: u64,
192 pub fields: Vec<KeyValue>,
193}
194
195#[derive(Debug, Serialize, Deserialize, PartialEq)]
197#[serde(rename_all = "camelCase")]
198pub struct KeyValue {
199 pub key: String,
200 #[serde(rename = "type")]
201 pub value_type: ValueType,
202 pub value: Value,
203}
204
205#[derive(Debug, Serialize, Deserialize, PartialEq)]
207#[serde(untagged)]
208#[serde(rename_all = "camelCase")]
209pub enum Value {
210 String(String),
211 Int64(i64),
212 Float64(f64),
213 Boolean(bool),
214 Binary(Vec<u8>),
215}
216
217#[derive(Debug, Serialize, Deserialize, PartialEq)]
219#[serde(rename_all = "lowercase")]
220pub enum ValueType {
221 String,
222 Int64,
223 Float64,
224 Boolean,
225 Binary,
226}
227
228#[derive(Default, Debug, Serialize, Deserialize)]
230#[serde(rename_all = "camelCase")]
231pub struct JaegerQueryParams {
232 #[serde(rename = "service")]
234 pub service_name: Option<String>,
235
236 #[serde(rename = "operation")]
238 pub operation_name: Option<String>,
239
240 #[serde(default, deserialize_with = "empty_string_as_none")]
242 pub limit: Option<usize>,
243
244 pub start: Option<i64>,
246
247 pub end: Option<i64>,
249
250 #[serde(default, deserialize_with = "empty_string_as_none")]
252 pub max_duration: Option<String>,
253
254 #[serde(default, deserialize_with = "empty_string_as_none")]
256 pub min_duration: Option<String>,
257
258 pub tags: Option<String>,
262
263 pub span_kind: Option<String>,
265}
266
267fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
269where
270 D: Deserializer<'de>,
271 T: FromStr,
272 T::Err: fmt::Display,
273{
274 let opt = Option::<String>::deserialize(de)?;
275 match opt.as_deref() {
276 None | Some("") => Ok(None),
277 Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
278 }
279}
280
281fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
282 query_ctx.set_channel(Channel::Jaeger);
284 if let Some(table) = table_name {
285 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
286 }
287}
288
289impl QueryTraceParams {
290 fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
291 let mut internal_query_params: QueryTraceParams = QueryTraceParams {
292 service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
293 reason: "service_name is required".to_string(),
294 })?,
295 operation_name: query_params.operation_name,
296 start_time: query_params.start.map(|start| start * 1000),
298 end_time: query_params.end.map(|end| end * 1000),
299 ..Default::default()
300 };
301
302 if let Some(max_duration) = query_params.max_duration {
303 let duration = humantime::parse_duration(&max_duration).map_err(|e| {
304 InvalidJaegerQuerySnafu {
305 reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
306 }
307 .build()
308 })?;
309 internal_query_params.max_duration = Some(duration.as_nanos() as u64);
310 }
311
312 if let Some(min_duration) = query_params.min_duration {
313 let duration = humantime::parse_duration(&min_duration).map_err(|e| {
314 InvalidJaegerQuerySnafu {
315 reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
316 }
317 .build()
318 })?;
319 internal_query_params.min_duration = Some(duration.as_nanos() as u64);
320 }
321
322 if let Some(tags) = query_params.tags {
323 let mut tags_map: HashMap<String, JsonValue> =
325 serde_json::from_str(&tags).map_err(|e| {
326 InvalidJaegerQuerySnafu {
327 reason: format!("parse tags '{}' failed: {}", tags, e),
328 }
329 .build()
330 })?;
331 for (_, v) in tags_map.iter_mut() {
332 if let Some(number) = convert_string_to_number(v) {
333 *v = number;
334 }
335 if let Some(boolean) = convert_string_to_boolean(v) {
336 *v = boolean;
337 }
338 }
339 internal_query_params.tags = Some(tags_map);
340 }
341
342 internal_query_params.limit = query_params.limit;
343
344 Ok(internal_query_params)
345 }
346}
347
348#[derive(Debug, Default, PartialEq)]
349pub struct QueryTraceParams {
350 pub service_name: String,
351 pub operation_name: Option<String>,
352
353 pub limit: Option<usize>,
355
356 pub tags: Option<HashMap<String, JsonValue>>,
358
359 pub start_time: Option<i64>,
361 pub end_time: Option<i64>,
362 pub min_duration: Option<u64>,
363 pub max_duration: Option<u64>,
364
365 pub user_agent: TraceUserAgent,
367}
368
369#[derive(Debug, Default, PartialEq, Eq)]
370pub enum TraceUserAgent {
371 Grafana,
372 #[default]
375 Jaeger,
376}
377
378impl From<UserAgent> for TraceUserAgent {
379 fn from(value: UserAgent) -> Self {
380 let ua_str = value.as_str().to_lowercase();
381 debug!("received user agent: {}", ua_str);
382 if ua_str.contains("grafana") {
383 Self::Grafana
384 } else {
385 Self::Jaeger
386 }
387 }
388}
389
390#[axum_macros::debug_handler]
392#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
393pub async fn handle_get_services(
394 State(handler): State<JaegerQueryHandlerRef>,
395 Query(query_params): Query<JaegerQueryParams>,
396 Extension(mut query_ctx): Extension<QueryContext>,
397 TraceTableName(table_name): TraceTableName,
398) -> impl IntoResponse {
399 debug!(
400 "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
401 query_params, query_ctx
402 );
403
404 query_ctx.set_channel(Channel::Jaeger);
405 if let Some(table) = table_name {
406 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
407 }
408
409 let query_ctx = Arc::new(query_ctx);
410 let db = query_ctx.get_db_string();
411
412 let _timer = METRIC_JAEGER_QUERY_ELAPSED
414 .with_label_values(&[&db, "/api/services"])
415 .start_timer();
416
417 match handler.get_services(query_ctx).await {
418 Ok(output) => match covert_to_records(output).await {
419 Ok(Some(records)) => match services_from_records(records) {
420 Ok(services) => {
421 let services_num = services.len();
422 (
423 HttpStatusCode::OK,
424 axum::Json(JaegerAPIResponse {
425 data: Some(JaegerData::ServiceNames(services)),
426 total: services_num,
427 ..Default::default()
428 }),
429 )
430 }
431 Err(err) => {
432 error!("Failed to get services: {:?}", err);
433 error_response(err)
434 }
435 },
436 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
437 Err(err) => {
438 error!("Failed to get services: {:?}", err);
439 error_response(err)
440 }
441 },
442 Err(err) => handle_query_error(err, "Failed to get services", &db),
443 }
444}
445
446#[axum_macros::debug_handler]
448#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
449pub async fn handle_get_trace(
450 State(handler): State<JaegerQueryHandlerRef>,
451 Path(trace_id): Path<String>,
452 Query(query_params): Query<JaegerQueryParams>,
453 Extension(mut query_ctx): Extension<QueryContext>,
454 TraceTableName(table_name): TraceTableName,
455) -> impl IntoResponse {
456 debug!(
457 "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
458 trace_id, query_params, query_ctx
459 );
460
461 update_query_context(&mut query_ctx, table_name);
462 let query_ctx = Arc::new(query_ctx);
463 let db = query_ctx.get_db_string();
464
465 let _timer = METRIC_JAEGER_QUERY_ELAPSED
467 .with_label_values(&[&db, "/api/traces"])
468 .start_timer();
469
470 let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
472 let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
473
474 let output = match handler
475 .get_trace(
476 query_ctx,
477 &trace_id,
478 start_time_ns,
479 end_time_ns,
480 query_params.limit,
481 )
482 .await
483 {
484 Ok(output) => output,
485 Err(err) => {
486 return handle_query_error(
487 err,
488 &format!("Failed to get trace for '{}'", trace_id),
489 &db,
490 );
491 }
492 };
493
494 match covert_to_records(output).await {
495 Ok(Some(records)) => match traces_from_records(records) {
496 Ok(traces) if traces.is_empty() => (
497 HttpStatusCode::NOT_FOUND,
498 axum::Json(JaegerAPIResponse::trace_not_found()),
499 ),
500 Ok(traces) => (
501 HttpStatusCode::OK,
502 axum::Json(JaegerAPIResponse {
503 data: Some(JaegerData::Traces(traces)),
504 ..Default::default()
505 }),
506 ),
507 Err(err) => {
508 error!("Failed to get trace '{}': {:?}", trace_id, err);
509 error_response(err)
510 }
511 },
512 Ok(None) => (
513 HttpStatusCode::NOT_FOUND,
514 axum::Json(JaegerAPIResponse::trace_not_found()),
515 ),
516 Err(err) => {
517 error!("Failed to get trace '{}': {:?}", trace_id, err);
518 error_response(err)
519 }
520 }
521}
522
523#[axum_macros::debug_handler]
525#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
526pub async fn handle_find_traces(
527 State(handler): State<JaegerQueryHandlerRef>,
528 Query(query_params): Query<JaegerQueryParams>,
529 Extension(mut query_ctx): Extension<QueryContext>,
530 TraceTableName(table_name): TraceTableName,
531 optional_user_agent: Option<TypedHeader<UserAgent>>,
532) -> impl IntoResponse {
533 debug!(
534 "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
535 query_params, query_ctx
536 );
537
538 update_query_context(&mut query_ctx, table_name);
539 let query_ctx = Arc::new(query_ctx);
540 let db = query_ctx.get_db_string();
541
542 let _timer = METRIC_JAEGER_QUERY_ELAPSED
544 .with_label_values(&[&db, "/api/traces"])
545 .start_timer();
546
547 match QueryTraceParams::from_jaeger_query_params(query_params) {
548 Ok(mut query_params) => {
549 if let Some(TypedHeader(user_agent)) = optional_user_agent {
550 query_params.user_agent = user_agent.into();
551 }
552 let output = handler.find_traces(query_ctx, query_params).await;
553 match output {
554 Ok(output) => match covert_to_records(output).await {
555 Ok(Some(records)) => match traces_from_records(records) {
556 Ok(traces) => (
557 HttpStatusCode::OK,
558 axum::Json(JaegerAPIResponse {
559 data: Some(JaegerData::Traces(traces)),
560 ..Default::default()
561 }),
562 ),
563 Err(err) => {
564 error!("Failed to find traces: {:?}", err);
565 error_response(err)
566 }
567 },
568 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
569 Err(err) => error_response(err),
570 },
571 Err(err) => handle_query_error(err, "Failed to find traces", &db),
572 }
573 }
574 Err(e) => error_response(e),
575 }
576}
577
578#[axum_macros::debug_handler]
580#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
581pub async fn handle_get_operations(
582 State(handler): State<JaegerQueryHandlerRef>,
583 Query(query_params): Query<JaegerQueryParams>,
584 Extension(mut query_ctx): Extension<QueryContext>,
585 TraceTableName(table_name): TraceTableName,
586 headers: HeaderMap,
587) -> impl IntoResponse {
588 debug!(
589 "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
590 query_params, query_ctx, headers
591 );
592
593 if let Some(service_name) = &query_params.service_name {
594 update_query_context(&mut query_ctx, table_name);
595 let query_ctx = Arc::new(query_ctx);
596 let db = query_ctx.get_db_string();
597
598 let _timer = METRIC_JAEGER_QUERY_ELAPSED
600 .with_label_values(&[&db, "/api/operations"])
601 .start_timer();
602
603 match handler
604 .get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
605 .await
606 {
607 Ok(output) => match covert_to_records(output).await {
608 Ok(Some(records)) => match operations_from_records(records, true) {
609 Ok(operations) => {
610 let total = operations.len();
611 (
612 HttpStatusCode::OK,
613 axum::Json(JaegerAPIResponse {
614 data: Some(JaegerData::Operations(operations)),
615 total,
616 ..Default::default()
617 }),
618 )
619 }
620 Err(err) => {
621 error!("Failed to get operations: {:?}", err);
622 error_response(err)
623 }
624 },
625 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
626 Err(err) => error_response(err),
627 },
628 Err(err) => handle_query_error(
629 err,
630 &format!("Failed to get operations for service '{}'", service_name),
631 &db,
632 ),
633 }
634 } else {
635 (
636 HttpStatusCode::BAD_REQUEST,
637 axum::Json(JaegerAPIResponse {
638 errors: vec![JaegerAPIError {
639 code: 400,
640 msg: "parameter 'service' is required".to_string(),
641 trace_id: None,
642 }],
643 ..Default::default()
644 }),
645 )
646 }
647}
648
649#[axum_macros::debug_handler]
651#[tracing::instrument(
652 skip_all,
653 fields(protocol = "jaeger", request_type = "get_operations_by_service")
654)]
655pub async fn handle_get_operations_by_service(
656 State(handler): State<JaegerQueryHandlerRef>,
657 Path(service_name): Path<String>,
658 Query(query_params): Query<JaegerQueryParams>,
659 Extension(mut query_ctx): Extension<QueryContext>,
660 TraceTableName(table_name): TraceTableName,
661 headers: HeaderMap,
662) -> impl IntoResponse {
663 debug!(
664 "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
665 service_name, query_params, query_ctx, headers
666 );
667
668 update_query_context(&mut query_ctx, table_name);
669 let query_ctx = Arc::new(query_ctx);
670 let db = query_ctx.get_db_string();
671
672 let _timer = METRIC_JAEGER_QUERY_ELAPSED
674 .with_label_values(&[&db, "/api/services"])
675 .start_timer();
676
677 match handler.get_operations(query_ctx, &service_name, None).await {
678 Ok(output) => match covert_to_records(output).await {
679 Ok(Some(records)) => match operations_from_records(records, false) {
680 Ok(operations) => {
681 let operations: Vec<String> =
682 operations.into_iter().map(|op| op.name).collect();
683 let total = operations.len();
684 (
685 HttpStatusCode::OK,
686 axum::Json(JaegerAPIResponse {
687 data: Some(JaegerData::OperationsNames(operations)),
688 total,
689 ..Default::default()
690 }),
691 )
692 }
693 Err(err) => {
694 error!(
695 "Failed to get operations for service '{}': {:?}",
696 service_name, err
697 );
698 error_response(err)
699 }
700 },
701 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
702 Err(err) => error_response(err),
703 },
704 Err(err) => handle_query_error(
705 err,
706 &format!("Failed to get operations for service '{}'", service_name),
707 &db,
708 ),
709 }
710}
711
712async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
713 match output.data {
714 OutputData::Stream(stream) => {
715 let records = HttpRecordsOutput::try_new(
716 stream.schema().clone(),
717 util::collect(stream)
718 .await
719 .context(CollectRecordbatchSnafu)?,
720 )?;
721 debug!(
722 "The query records: {}",
723 serde_json::to_string(&records).unwrap()
724 );
725 Ok(Some(records))
726 }
727 _ => Ok(None),
729 }
730}
731
732fn handle_query_error(
733 err: Error,
734 prompt: &str,
735 db: &str,
736) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
737 if err.status_code() == StatusCode::TableNotFound {
739 warn!(
740 "No trace table '{}' found in database '{}'",
741 TRACE_TABLE_NAME, db
742 );
743 (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
744 } else {
745 error!("{}: {:?}", prompt, err);
746 error_response(err)
747 }
748}
749
750fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
751 (
752 status_code_to_http_status(&err.status_code()),
753 axum::Json(JaegerAPIResponse {
754 errors: vec![JaegerAPIError {
755 code: err.status_code() as i32,
756 msg: err.to_string(),
757 ..Default::default()
758 }],
759 ..Default::default()
760 }),
761 )
762}
763
764fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
765 let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
767 let mut trace_id_to_spans: BTreeMap<String, Vec<Span>> = BTreeMap::new();
770 let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
772
773 let is_span_attributes_flatten = !records
774 .schema
775 .column_schemas
776 .iter()
777 .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
778
779 for row in records.rows.into_iter() {
780 let mut span = Span::default();
781 let mut service_name = None;
782 let mut resource_tags = vec![];
783
784 for (idx, cell) in row.into_iter().enumerate() {
785 let column_name = &records.schema.column_schemas[idx].name;
787
788 match column_name.as_str() {
789 TRACE_ID_COLUMN => {
790 if let JsonValue::String(trace_id) = cell {
791 span.trace_id = trace_id.clone();
792 trace_id_to_processes.entry(trace_id).or_default();
793 }
794 }
795 TIMESTAMP_COLUMN => {
796 span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
797 reason: "Failed to convert timestamp to u64".to_string(),
798 })? / 1000;
799 }
800 DURATION_NANO_COLUMN => {
801 span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
802 reason: "Failed to convert duration to u64".to_string(),
803 })? / 1000;
804 }
805 SERVICE_NAME_COLUMN => {
806 if let JsonValue::String(name) = cell {
807 service_name = Some(name);
808 }
809 }
810 SPAN_NAME_COLUMN => {
811 if let JsonValue::String(span_name) = cell {
812 span.operation_name = span_name;
813 }
814 }
815 SPAN_ID_COLUMN => {
816 if let JsonValue::String(span_id) = cell {
817 span.span_id = span_id;
818 }
819 }
820 SPAN_ATTRIBUTES_COLUMN => {
821 if let JsonValue::Object(span_attrs) = cell {
824 span.tags.extend(object_to_tags(span_attrs));
825 }
826 }
827 RESOURCE_ATTRIBUTES_COLUMN => {
828 if let JsonValue::Object(mut resource_attrs) = cell {
832 resource_attrs.remove(KEY_SERVICE_NAME);
833 resource_tags = object_to_tags(resource_attrs);
834 }
835 }
836 PARENT_SPAN_ID_COLUMN => {
837 if let JsonValue::String(parent_span_id) = cell
838 && !parent_span_id.is_empty()
839 {
840 span.references.push(Reference {
841 trace_id: span.trace_id.clone(),
842 span_id: parent_span_id,
843 ref_type: REF_TYPE_CHILD_OF.to_string(),
844 });
845 }
846 }
847 SPAN_EVENTS_COLUMN => {
848 if let JsonValue::Array(events) = cell {
849 for event in events {
850 if let JsonValue::Object(mut obj) = event {
851 let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
852 continue;
853 };
854
855 let Some(t) =
856 obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
857 SPAN_KIND_TIME_FMTS
858 .iter()
859 .find_map(|fmt| {
860 chrono::DateTime::parse_from_str(s, fmt).ok()
861 })
862 .map(|dt| dt.timestamp_micros() as u64)
863 })
864 else {
865 continue;
866 };
867
868 let mut fields = vec![KeyValue {
869 key: "event".to_string(),
870 value_type: ValueType::String,
871 value: Value::String(action.to_string()),
872 }];
873
874 if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
876 fields.extend(object_to_tags(attrs));
877 }
878
879 span.logs.push(Log {
880 timestamp: t,
881 fields,
882 });
883 }
884 }
885 }
886 }
887 SCOPE_NAME_COLUMN => {
888 if let JsonValue::String(scope_name) = cell
889 && !scope_name.is_empty()
890 {
891 span.tags.push(KeyValue {
892 key: KEY_OTEL_SCOPE_NAME.to_string(),
893 value_type: ValueType::String,
894 value: Value::String(scope_name),
895 });
896 }
897 }
898 SCOPE_VERSION_COLUMN => {
899 if let JsonValue::String(scope_version) = cell
900 && !scope_version.is_empty()
901 {
902 span.tags.push(KeyValue {
903 key: KEY_OTEL_SCOPE_VERSION.to_string(),
904 value_type: ValueType::String,
905 value: Value::String(scope_version),
906 });
907 }
908 }
909 SPAN_KIND_COLUMN => {
910 if let JsonValue::String(span_kind) = cell
911 && !span_kind.is_empty()
912 {
913 span.tags.push(KeyValue {
914 key: KEY_SPAN_KIND.to_string(),
915 value_type: ValueType::String,
916 value: Value::String(normalize_span_kind(&span_kind)),
917 });
918 }
919 }
920 SPAN_STATUS_CODE => {
921 if let JsonValue::String(span_status) = cell
922 && span_status != SPAN_STATUS_UNSET
923 && !span_status.is_empty()
924 {
925 span.tags.push(KeyValue {
926 key: KEY_OTEL_STATUS_CODE.to_string(),
927 value_type: ValueType::String,
928 value: Value::String(normalize_status_code(&span_status)),
929 });
930 if span_status == SPAN_STATUS_ERROR {
932 span.tags.push(KeyValue {
933 key: KEY_OTEL_STATUS_ERROR_KEY.to_string(),
934 value_type: ValueType::Boolean,
935 value: Value::Boolean(true),
936 });
937 }
938 }
939 }
940
941 SPAN_STATUS_MESSAGE_COLUMN => {
942 if let JsonValue::String(span_status_message) = cell
943 && !span_status_message.is_empty()
944 {
945 span.tags.push(KeyValue {
946 key: KEY_OTEL_STATUS_MESSAGE.to_string(),
947 value_type: ValueType::String,
948 value: Value::String(span_status_message),
949 });
950 }
951 }
952
953 TRACE_STATE_COLUMN => {
954 if let JsonValue::String(trace_state) = cell
955 && !trace_state.is_empty()
956 {
957 span.tags.push(KeyValue {
958 key: KEY_OTEL_TRACE_STATE.to_string(),
959 value_type: ValueType::String,
960 value: Value::String(trace_state),
961 });
962 }
963 }
964
965 _ => {
966 if is_span_attributes_flatten {
968 const SPAN_ATTR_PREFIX: &str = "span_attributes.";
969 const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
970 if column_name.starts_with(SPAN_ATTR_PREFIX) {
972 if let Some(keyvalue) = to_keyvalue(
973 column_name
974 .strip_prefix(SPAN_ATTR_PREFIX)
975 .unwrap_or_default()
976 .to_string(),
977 cell,
978 ) {
979 span.tags.push(keyvalue);
980 }
981 } else if column_name.starts_with(RESOURCE_ATTR_PREFIX)
982 && let Some(keyvalue) = to_keyvalue(
983 column_name
984 .strip_prefix(RESOURCE_ATTR_PREFIX)
985 .unwrap_or_default()
986 .to_string(),
987 cell,
988 )
989 {
990 resource_tags.push(keyvalue);
991 }
992 }
993 }
994 }
995 }
996
997 if let Some(service_name) = service_name {
998 if !service_to_resource_attributes.contains_key(&service_name) {
999 service_to_resource_attributes.insert(service_name.clone(), resource_tags);
1000 }
1001
1002 if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
1003 if let Some(process_id) = process.get(&service_name) {
1004 span.process_id = process_id.clone();
1005 } else {
1006 let process_id = format!("p{}", process.len() + 1);
1008 process.insert(service_name, process_id.clone());
1009 span.process_id = process_id;
1010 }
1011 }
1012 }
1013
1014 span.tags.sort_by(|a, b| a.key.cmp(&b.key));
1016
1017 if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
1018 spans.push(span);
1019 } else {
1020 trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
1021 }
1022 }
1023
1024 let mut traces = Vec::new();
1025 for (trace_id, spans) in trace_id_to_spans {
1026 let mut trace = Trace {
1027 trace_id,
1028 spans,
1029 ..Default::default()
1030 };
1031
1032 if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
1033 let mut process_id_to_process = HashMap::new();
1034 for (service_name, process_id) in processes.into_iter() {
1035 let tags = service_to_resource_attributes
1036 .remove(&service_name)
1037 .unwrap_or_default();
1038 process_id_to_process.insert(process_id, Process { service_name, tags });
1039 }
1040 trace.processes = process_id_to_process;
1041 }
1042 traces.push(trace);
1043 }
1044
1045 Ok(traces)
1046}
1047
1048fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
1049 match value {
1050 JsonValue::String(value) => Some(KeyValue {
1051 key,
1052 value_type: ValueType::String,
1053 value: Value::String(value.clone()),
1054 }),
1055 JsonValue::Number(value) => Some(KeyValue {
1056 key,
1057 value_type: ValueType::Int64,
1058 value: Value::Int64(value.as_i64().unwrap_or(0)),
1059 }),
1060 JsonValue::Bool(value) => Some(KeyValue {
1061 key,
1062 value_type: ValueType::Boolean,
1063 value: Value::Boolean(value),
1064 }),
1065 JsonValue::Array(value) => Some(KeyValue {
1066 key,
1067 value_type: ValueType::String,
1068 value: Value::String(serde_json::to_string(&value).unwrap()),
1069 }),
1070 JsonValue::Object(value) => Some(KeyValue {
1071 key,
1072 value_type: ValueType::String,
1073 value: Value::String(serde_json::to_string(&value).unwrap()),
1074 }),
1075 JsonValue::Null => None,
1076 }
1077}
1078
1079fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1080 object
1081 .into_iter()
1082 .filter_map(|(key, value)| to_keyvalue(key, value))
1083 .collect()
1084}
1085
1086fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1087 let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1088 check_schema(&records, &expected_schema)?;
1089
1090 let mut services = Vec::with_capacity(records.total_rows);
1091 for row in records.rows.into_iter() {
1092 for value in row.into_iter() {
1093 if let JsonValue::String(service_name) = value {
1094 services.push(service_name);
1095 }
1096 }
1097 }
1098 Ok(services)
1099}
1100
1101fn operations_from_records(
1103 records: HttpRecordsOutput,
1104 contain_span_kind: bool,
1105) -> Result<Vec<Operation>> {
1106 let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1107 check_schema(&records, &expected_schema)?;
1108
1109 let mut operations = Vec::with_capacity(records.total_rows);
1110 for row in records.rows.into_iter() {
1111 let mut row_iter = row.into_iter();
1112 if let Some(JsonValue::String(operation)) = row_iter.next() {
1113 let mut operation = Operation {
1114 name: operation,
1115 span_kind: None,
1116 };
1117 if contain_span_kind {
1118 if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1119 operation.span_kind = Some(normalize_span_kind(&span_kind));
1120 }
1121 } else {
1122 row_iter.next();
1124 }
1125 operations.push(operation);
1126 }
1127 }
1128
1129 Ok(operations)
1130}
1131
1132fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1134 for (i, column) in records.schema.column_schemas.iter().enumerate() {
1135 if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1136 InvalidJaegerQuerySnafu {
1137 reason: "query result schema is not correct".to_string(),
1138 }
1139 .fail()?
1140 }
1141 }
1142 Ok(())
1143}
1144
1145fn normalize_span_kind(span_kind: &str) -> String {
1148 if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1150 stripped.to_lowercase()
1151 } else {
1152 span_kind.to_lowercase()
1154 }
1155}
1156
1157fn normalize_status_code(status_code: &str) -> String {
1160 if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1162 stripped.to_string()
1163 } else {
1164 status_code.to_string()
1166 }
1167}
1168
1169fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1170 if let Some(data) = input.as_str() {
1171 if let Ok(number) = data.parse::<i64>() {
1172 return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1173 }
1174 if let Ok(number) = data.parse::<f64>()
1175 && let Some(number) = serde_json::Number::from_f64(number)
1176 {
1177 return Some(serde_json::Value::Number(number));
1178 }
1179 }
1180
1181 None
1182}
1183
1184fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1185 if let Some(data) = input.as_str() {
1186 if data == "true" {
1187 return Some(serde_json::Value::Bool(true));
1188 }
1189 if data == "false" {
1190 return Some(serde_json::Value::Bool(false));
1191 }
1192 }
1193
1194 None
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199 use serde_json::{Number, Value as JsonValue, json};
1200
1201 use super::*;
1202 use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1203
1204 #[test]
1205 fn test_services_from_records() {
1206 let tests = vec![(
1208 HttpRecordsOutput {
1209 schema: OutputSchema {
1210 column_schemas: vec![ColumnSchema {
1211 name: "service_name".to_string(),
1212 data_type: "String".to_string(),
1213 }],
1214 },
1215 rows: vec![
1216 vec![JsonValue::String("test-service-0".to_string())],
1217 vec![JsonValue::String("test-service-1".to_string())],
1218 ],
1219 total_rows: 2,
1220 metrics: HashMap::new(),
1221 },
1222 vec!["test-service-0".to_string(), "test-service-1".to_string()],
1223 )];
1224
1225 for (records, expected) in tests {
1226 let services = services_from_records(records).unwrap();
1227 assert_eq!(services, expected);
1228 }
1229 }
1230
1231 #[test]
1232 fn test_operations_from_records() {
1233 let tests = vec![
1235 (
1236 HttpRecordsOutput {
1237 schema: OutputSchema {
1238 column_schemas: vec![
1239 ColumnSchema {
1240 name: "span_name".to_string(),
1241 data_type: "String".to_string(),
1242 },
1243 ColumnSchema {
1244 name: "span_kind".to_string(),
1245 data_type: "String".to_string(),
1246 },
1247 ],
1248 },
1249 rows: vec![
1250 vec![
1251 JsonValue::String("access-mysql".to_string()),
1252 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1253 ],
1254 vec![
1255 JsonValue::String("access-redis".to_string()),
1256 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1257 ],
1258 ],
1259 total_rows: 2,
1260 metrics: HashMap::new(),
1261 },
1262 false,
1263 vec![
1264 Operation {
1265 name: "access-mysql".to_string(),
1266 span_kind: None,
1267 },
1268 Operation {
1269 name: "access-redis".to_string(),
1270 span_kind: None,
1271 },
1272 ],
1273 ),
1274 (
1275 HttpRecordsOutput {
1276 schema: OutputSchema {
1277 column_schemas: vec![
1278 ColumnSchema {
1279 name: "span_name".to_string(),
1280 data_type: "String".to_string(),
1281 },
1282 ColumnSchema {
1283 name: "span_kind".to_string(),
1284 data_type: "String".to_string(),
1285 },
1286 ],
1287 },
1288 rows: vec![
1289 vec![
1290 JsonValue::String("access-mysql".to_string()),
1291 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1292 ],
1293 vec![
1294 JsonValue::String("access-redis".to_string()),
1295 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1296 ],
1297 ],
1298 total_rows: 2,
1299 metrics: HashMap::new(),
1300 },
1301 true,
1302 vec![
1303 Operation {
1304 name: "access-mysql".to_string(),
1305 span_kind: Some("server".to_string()),
1306 },
1307 Operation {
1308 name: "access-redis".to_string(),
1309 span_kind: Some("client".to_string()),
1310 },
1311 ],
1312 ),
1313 ];
1314
1315 for (records, contain_span_kind, expected) in tests {
1316 let operations = operations_from_records(records, contain_span_kind).unwrap();
1317 assert_eq!(operations, expected);
1318 }
1319 }
1320
1321 #[test]
1322 fn test_traces_from_records() {
1323 let tests = vec![(
1325 HttpRecordsOutput {
1326 schema: OutputSchema {
1327 column_schemas: vec![
1328 ColumnSchema {
1329 name: "trace_id".to_string(),
1330 data_type: "String".to_string(),
1331 },
1332 ColumnSchema {
1333 name: "timestamp".to_string(),
1334 data_type: "TimestampNanosecond".to_string(),
1335 },
1336 ColumnSchema {
1337 name: "duration_nano".to_string(),
1338 data_type: "UInt64".to_string(),
1339 },
1340 ColumnSchema {
1341 name: "service_name".to_string(),
1342 data_type: "String".to_string(),
1343 },
1344 ColumnSchema {
1345 name: "span_name".to_string(),
1346 data_type: "String".to_string(),
1347 },
1348 ColumnSchema {
1349 name: "span_id".to_string(),
1350 data_type: "String".to_string(),
1351 },
1352 ColumnSchema {
1353 name: "span_attributes".to_string(),
1354 data_type: "Json".to_string(),
1355 },
1356 ],
1357 },
1358 rows: vec![
1359 vec![
1360 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1361 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1362 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1363 JsonValue::String("test-service-0".to_string()),
1364 JsonValue::String("access-mysql".to_string()),
1365 JsonValue::String("008421dbbd33a3e9".to_string()),
1366 JsonValue::Object(
1367 json!({
1368 "operation.type": "access-mysql",
1369 })
1370 .as_object()
1371 .unwrap()
1372 .clone(),
1373 ),
1374 ],
1375 vec![
1376 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1377 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1378 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1379 JsonValue::String("test-service-0".to_string()),
1380 JsonValue::String("access-redis".to_string()),
1381 JsonValue::String("ffa03416a7b9ea48".to_string()),
1382 JsonValue::Object(
1383 json!({
1384 "operation.type": "access-redis",
1385 })
1386 .as_object()
1387 .unwrap()
1388 .clone(),
1389 ),
1390 ],
1391 ],
1392 total_rows: 2,
1393 metrics: HashMap::new(),
1394 },
1395 vec![Trace {
1396 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1397 spans: vec![
1398 Span {
1399 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1400 span_id: "008421dbbd33a3e9".to_string(),
1401 operation_name: "access-mysql".to_string(),
1402 start_time: 1738726754492422,
1403 duration: 100000,
1404 tags: vec![KeyValue {
1405 key: "operation.type".to_string(),
1406 value_type: ValueType::String,
1407 value: Value::String("access-mysql".to_string()),
1408 }],
1409 process_id: "p1".to_string(),
1410 ..Default::default()
1411 },
1412 Span {
1413 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1414 span_id: "ffa03416a7b9ea48".to_string(),
1415 operation_name: "access-redis".to_string(),
1416 start_time: 1738726754642422,
1417 duration: 100000,
1418 tags: vec![KeyValue {
1419 key: "operation.type".to_string(),
1420 value_type: ValueType::String,
1421 value: Value::String("access-redis".to_string()),
1422 }],
1423 process_id: "p1".to_string(),
1424 ..Default::default()
1425 },
1426 ],
1427 processes: HashMap::from([(
1428 "p1".to_string(),
1429 Process {
1430 service_name: "test-service-0".to_string(),
1431 tags: vec![],
1432 },
1433 )]),
1434 ..Default::default()
1435 }],
1436 )];
1437
1438 for (records, expected) in tests {
1439 let traces = traces_from_records(records).unwrap();
1440 assert_eq!(traces, expected);
1441 }
1442 }
1443
1444 #[test]
1445 fn test_traces_from_v1_records() {
1446 let tests = vec![(
1448 HttpRecordsOutput {
1449 schema: OutputSchema {
1450 column_schemas: vec![
1451 ColumnSchema {
1452 name: "trace_id".to_string(),
1453 data_type: "String".to_string(),
1454 },
1455 ColumnSchema {
1456 name: "timestamp".to_string(),
1457 data_type: "TimestampNanosecond".to_string(),
1458 },
1459 ColumnSchema {
1460 name: "duration_nano".to_string(),
1461 data_type: "UInt64".to_string(),
1462 },
1463 ColumnSchema {
1464 name: "service_name".to_string(),
1465 data_type: "String".to_string(),
1466 },
1467 ColumnSchema {
1468 name: "span_name".to_string(),
1469 data_type: "String".to_string(),
1470 },
1471 ColumnSchema {
1472 name: "span_id".to_string(),
1473 data_type: "String".to_string(),
1474 },
1475 ColumnSchema {
1476 name: "span_attributes.http.request.method".to_string(),
1477 data_type: "String".to_string(),
1478 },
1479 ColumnSchema {
1480 name: "span_attributes.http.request.url".to_string(),
1481 data_type: "String".to_string(),
1482 },
1483 ColumnSchema {
1484 name: "span_attributes.http.status_code".to_string(),
1485 data_type: "UInt64".to_string(),
1486 },
1487 ],
1488 },
1489 rows: vec![
1490 vec![
1491 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1492 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1493 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1494 JsonValue::String("test-service-0".to_string()),
1495 JsonValue::String("access-mysql".to_string()),
1496 JsonValue::String("008421dbbd33a3e9".to_string()),
1497 JsonValue::String("GET".to_string()),
1498 JsonValue::String("/data".to_string()),
1499 JsonValue::Number(Number::from_u128(200).unwrap()),
1500 ],
1501 vec![
1502 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1503 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1504 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1505 JsonValue::String("test-service-0".to_string()),
1506 JsonValue::String("access-redis".to_string()),
1507 JsonValue::String("ffa03416a7b9ea48".to_string()),
1508 JsonValue::String("POST".to_string()),
1509 JsonValue::String("/create".to_string()),
1510 JsonValue::Number(Number::from_u128(400).unwrap()),
1511 ],
1512 ],
1513 total_rows: 2,
1514 metrics: HashMap::new(),
1515 },
1516 vec![Trace {
1517 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1518 spans: vec![
1519 Span {
1520 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1521 span_id: "008421dbbd33a3e9".to_string(),
1522 operation_name: "access-mysql".to_string(),
1523 start_time: 1738726754492422,
1524 duration: 100000,
1525 tags: vec![
1526 KeyValue {
1527 key: "http.request.method".to_string(),
1528 value_type: ValueType::String,
1529 value: Value::String("GET".to_string()),
1530 },
1531 KeyValue {
1532 key: "http.request.url".to_string(),
1533 value_type: ValueType::String,
1534 value: Value::String("/data".to_string()),
1535 },
1536 KeyValue {
1537 key: "http.status_code".to_string(),
1538 value_type: ValueType::Int64,
1539 value: Value::Int64(200),
1540 },
1541 ],
1542 process_id: "p1".to_string(),
1543 ..Default::default()
1544 },
1545 Span {
1546 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1547 span_id: "ffa03416a7b9ea48".to_string(),
1548 operation_name: "access-redis".to_string(),
1549 start_time: 1738726754642422,
1550 duration: 100000,
1551 tags: vec![
1552 KeyValue {
1553 key: "http.request.method".to_string(),
1554 value_type: ValueType::String,
1555 value: Value::String("POST".to_string()),
1556 },
1557 KeyValue {
1558 key: "http.request.url".to_string(),
1559 value_type: ValueType::String,
1560 value: Value::String("/create".to_string()),
1561 },
1562 KeyValue {
1563 key: "http.status_code".to_string(),
1564 value_type: ValueType::Int64,
1565 value: Value::Int64(400),
1566 },
1567 ],
1568 process_id: "p1".to_string(),
1569 ..Default::default()
1570 },
1571 ],
1572 processes: HashMap::from([(
1573 "p1".to_string(),
1574 Process {
1575 service_name: "test-service-0".to_string(),
1576 tags: vec![],
1577 },
1578 )]),
1579 ..Default::default()
1580 }],
1581 )];
1582
1583 for (records, expected) in tests {
1584 let traces = traces_from_records(records).unwrap();
1585 assert_eq!(traces, expected);
1586 }
1587 }
1588
1589 #[test]
1590 fn test_from_jaeger_query_params() {
1591 let tests = vec![
1593 (
1594 JaegerQueryParams {
1595 service_name: Some("test-service-0".to_string()),
1596 ..Default::default()
1597 },
1598 QueryTraceParams {
1599 service_name: "test-service-0".to_string(),
1600 ..Default::default()
1601 },
1602 ),
1603 (
1604 JaegerQueryParams {
1605 service_name: Some("test-service-0".to_string()),
1606 operation_name: Some("access-mysql".to_string()),
1607 start: Some(1738726754492422),
1608 end: Some(1738726754642422),
1609 max_duration: Some("100ms".to_string()),
1610 min_duration: Some("50ms".to_string()),
1611 limit: Some(10),
1612 tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1613 ..Default::default()
1614 },
1615 QueryTraceParams {
1616 service_name: "test-service-0".to_string(),
1617 operation_name: Some("access-mysql".to_string()),
1618 start_time: Some(1738726754492422000),
1619 end_time: Some(1738726754642422000),
1620 min_duration: Some(50000000),
1621 max_duration: Some(100000000),
1622 limit: Some(10),
1623 tags: Some(HashMap::from([
1624 ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1625 ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1626 ("error".to_string(), JsonValue::Bool(false)),
1627 ("http.method".to_string(), JsonValue::String("GET".to_string())),
1628 ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1629 ])),
1630 user_agent: TraceUserAgent::Jaeger,
1631 },
1632 ),
1633 ];
1634
1635 for (query_params, expected) in tests {
1636 let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1637 assert_eq!(query_params, expected);
1638 }
1639 }
1640
1641 #[test]
1642 fn test_check_schema() {
1643 let tests = vec![(
1645 HttpRecordsOutput {
1646 schema: OutputSchema {
1647 column_schemas: vec![
1648 ColumnSchema {
1649 name: "trace_id".to_string(),
1650 data_type: "String".to_string(),
1651 },
1652 ColumnSchema {
1653 name: "timestamp".to_string(),
1654 data_type: "TimestampNanosecond".to_string(),
1655 },
1656 ColumnSchema {
1657 name: "duration_nano".to_string(),
1658 data_type: "UInt64".to_string(),
1659 },
1660 ColumnSchema {
1661 name: "service_name".to_string(),
1662 data_type: "String".to_string(),
1663 },
1664 ColumnSchema {
1665 name: "span_name".to_string(),
1666 data_type: "String".to_string(),
1667 },
1668 ColumnSchema {
1669 name: "span_id".to_string(),
1670 data_type: "String".to_string(),
1671 },
1672 ColumnSchema {
1673 name: "span_attributes".to_string(),
1674 data_type: "Json".to_string(),
1675 },
1676 ],
1677 },
1678 rows: vec![],
1679 total_rows: 0,
1680 metrics: HashMap::new(),
1681 },
1682 vec![
1683 (TRACE_ID_COLUMN, "String"),
1684 (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1685 (DURATION_NANO_COLUMN, "UInt64"),
1686 (SERVICE_NAME_COLUMN, "String"),
1687 (SPAN_NAME_COLUMN, "String"),
1688 (SPAN_ID_COLUMN, "String"),
1689 (SPAN_ATTRIBUTES_COLUMN, "Json"),
1690 ],
1691 true,
1692 )];
1693
1694 for (records, expected_schema, is_ok) in tests {
1695 let result = check_schema(&records, &expected_schema);
1696 assert_eq!(result.is_ok(), is_ok);
1697 }
1698 }
1699
1700 #[test]
1701 fn test_normalize_span_kind() {
1702 let tests = vec![
1703 ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1704 ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1705 ];
1706
1707 for (input, expected) in tests {
1708 let result = normalize_span_kind(&input);
1709 assert_eq!(result, expected);
1710 }
1711 }
1712
1713 #[test]
1714 fn test_convert_string_to_number() {
1715 let tests = vec![
1716 (
1717 JsonValue::String("123".to_string()),
1718 Some(JsonValue::Number(Number::from(123))),
1719 ),
1720 (
1721 JsonValue::String("123.456".to_string()),
1722 Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1723 ),
1724 ];
1725
1726 for (input, expected) in tests {
1727 let result = convert_string_to_number(&input);
1728 assert_eq!(result, expected);
1729 }
1730 }
1731
1732 #[test]
1733 fn test_convert_string_to_boolean() {
1734 let tests = vec![
1735 (
1736 JsonValue::String("true".to_string()),
1737 Some(JsonValue::Bool(true)),
1738 ),
1739 (
1740 JsonValue::String("false".to_string()),
1741 Some(JsonValue::Bool(false)),
1742 ),
1743 ];
1744
1745 for (input, expected) in tests {
1746 let result = convert_string_to_boolean(&input);
1747 assert_eq!(result, expected);
1748 }
1749 }
1750}