1use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::extract::{Path, Query, State};
22use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
23use axum::response::IntoResponse;
24use chrono::Utc;
25use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::util;
30use common_telemetry::{debug, error, tracing, warn};
31use serde::{Deserialize, Deserializer, Serialize, de};
32use serde_json::Value as JsonValue;
33use session::context::{Channel, QueryContext};
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37 CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result, status_code_to_http_status,
38};
39use crate::http::HttpRecordsOutput;
40use crate::http::extractor::TraceTableName;
41use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
42use crate::otlp::trace::{
43 DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
44 KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
45 SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
46 SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
47 SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
48};
49use crate::query_handler::JaegerQueryHandlerRef;
50
51pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
52
53const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
54const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
55pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
56
57#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
60pub struct JaegerAPIResponse {
61 pub data: Option<JaegerData>,
62 pub total: usize,
63 pub limit: usize,
64 pub offset: usize,
65 pub errors: Vec<JaegerAPIError>,
66}
67
68#[derive(Debug, Serialize, Deserialize, PartialEq)]
70#[serde(untagged)]
71pub enum JaegerData {
72 ServiceNames(Vec<String>),
73 OperationsNames(Vec<String>),
74 Operations(Vec<Operation>),
75 Traces(Vec<Trace>),
76}
77
78#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
80#[serde(rename_all = "camelCase")]
81pub struct JaegerAPIError {
82 pub code: i32,
83 pub msg: String,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub trace_id: Option<String>,
86}
87
88#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
90#[serde(rename_all = "camelCase")]
91pub struct Operation {
92 pub name: String,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub span_kind: Option<String>,
95}
96
97#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
99#[serde(rename_all = "camelCase")]
100pub struct Trace {
101 #[serde(rename = "traceID")]
102 pub trace_id: String,
103 pub spans: Vec<Span>,
104
105 #[serde(skip_serializing_if = "HashMap::is_empty")]
106 pub processes: HashMap<String, Process>,
107
108 #[serde(skip_serializing_if = "Vec::is_empty")]
109 pub warnings: Vec<String>,
110}
111
112#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct Span {
116 #[serde(rename = "traceID")]
117 pub trace_id: String,
118
119 #[serde(rename = "spanID")]
120 pub span_id: String,
121
122 #[serde(rename = "parentSpanID")]
123 #[serde(skip_serializing_if = "String::is_empty")]
124 pub parent_span_id: String,
125
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub flags: Option<u32>,
128
129 pub operation_name: String,
130 pub references: Vec<Reference>,
131 pub start_time: u64, pub duration: u64, pub tags: Vec<KeyValue>,
134 pub logs: Vec<Log>,
135
136 #[serde(rename = "processID")]
137 #[serde(skip_serializing_if = "String::is_empty")]
138 pub process_id: String,
139
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub process: Option<Process>,
142
143 #[serde(skip_serializing_if = "Vec::is_empty")]
144 pub warnings: Vec<String>,
145}
146
147#[derive(Debug, Serialize, Deserialize, PartialEq)]
149#[serde(rename_all = "camelCase")]
150pub struct Reference {
151 #[serde(rename = "traceID")]
152 pub trace_id: String,
153 #[serde(rename = "spanID")]
154 pub span_id: String,
155 pub ref_type: String,
156}
157
158#[derive(Debug, Serialize, Deserialize, PartialEq)]
160#[serde(rename_all = "camelCase")]
161pub struct Process {
162 pub service_name: String,
163 pub tags: Vec<KeyValue>,
164}
165
166#[derive(Debug, Serialize, Deserialize, PartialEq)]
168#[serde(rename_all = "camelCase")]
169pub struct Log {
170 pub timestamp: u64,
171 pub fields: Vec<KeyValue>,
172}
173
174#[derive(Debug, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub struct KeyValue {
178 pub key: String,
179 #[serde(rename = "type")]
180 pub value_type: ValueType,
181 pub value: Value,
182}
183
184#[derive(Debug, Serialize, Deserialize, PartialEq)]
186#[serde(untagged)]
187#[serde(rename_all = "camelCase")]
188pub enum Value {
189 String(String),
190 Int64(i64),
191 Float64(f64),
192 Boolean(bool),
193 Binary(Vec<u8>),
194}
195
196#[derive(Debug, Serialize, Deserialize, PartialEq)]
198#[serde(rename_all = "lowercase")]
199pub enum ValueType {
200 String,
201 Int64,
202 Float64,
203 Boolean,
204 Binary,
205}
206
207#[derive(Default, Debug, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct JaegerQueryParams {
211 #[serde(rename = "service")]
213 pub service_name: Option<String>,
214
215 #[serde(rename = "operation")]
217 pub operation_name: Option<String>,
218
219 #[serde(default, deserialize_with = "empty_string_as_none")]
221 pub limit: Option<usize>,
222
223 pub start: Option<i64>,
225
226 pub end: Option<i64>,
228
229 #[serde(default, deserialize_with = "empty_string_as_none")]
231 pub max_duration: Option<String>,
232
233 #[serde(default, deserialize_with = "empty_string_as_none")]
235 pub min_duration: Option<String>,
236
237 pub tags: Option<String>,
241
242 pub span_kind: Option<String>,
244}
245
246fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
248where
249 D: Deserializer<'de>,
250 T: FromStr,
251 T::Err: fmt::Display,
252{
253 let opt = Option::<String>::deserialize(de)?;
254 match opt.as_deref() {
255 None | Some("") => Ok(None),
256 Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
257 }
258}
259
260fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
261 query_ctx.set_channel(Channel::Jaeger);
263 if let Some(table) = table_name {
264 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
265 }
266}
267
268impl QueryTraceParams {
269 fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
270 let mut internal_query_params: QueryTraceParams = QueryTraceParams {
271 service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
272 reason: "service_name is required".to_string(),
273 })?,
274 operation_name: query_params.operation_name,
275 start_time: query_params.start.map(|start| start * 1000),
277 end_time: query_params.end.map(|end| end * 1000),
278 ..Default::default()
279 };
280
281 if let Some(max_duration) = query_params.max_duration {
282 let duration = humantime::parse_duration(&max_duration).map_err(|e| {
283 InvalidJaegerQuerySnafu {
284 reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
285 }
286 .build()
287 })?;
288 internal_query_params.max_duration = Some(duration.as_nanos() as u64);
289 }
290
291 if let Some(min_duration) = query_params.min_duration {
292 let duration = humantime::parse_duration(&min_duration).map_err(|e| {
293 InvalidJaegerQuerySnafu {
294 reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
295 }
296 .build()
297 })?;
298 internal_query_params.min_duration = Some(duration.as_nanos() as u64);
299 }
300
301 if let Some(tags) = query_params.tags {
302 let mut tags_map: HashMap<String, JsonValue> =
304 serde_json::from_str(&tags).map_err(|e| {
305 InvalidJaegerQuerySnafu {
306 reason: format!("parse tags '{}' failed: {}", tags, e),
307 }
308 .build()
309 })?;
310 for (_, v) in tags_map.iter_mut() {
311 if let Some(number) = convert_string_to_number(v) {
312 *v = number;
313 }
314 if let Some(boolean) = convert_string_to_boolean(v) {
315 *v = boolean;
316 }
317 }
318 internal_query_params.tags = Some(tags_map);
319 }
320
321 internal_query_params.limit = query_params.limit;
322
323 Ok(internal_query_params)
324 }
325}
326
327#[derive(Debug, Default, PartialEq)]
328pub struct QueryTraceParams {
329 pub service_name: String,
330 pub operation_name: Option<String>,
331
332 pub limit: Option<usize>,
334
335 pub tags: Option<HashMap<String, JsonValue>>,
337
338 pub start_time: Option<i64>,
340 pub end_time: Option<i64>,
341 pub min_duration: Option<u64>,
342 pub max_duration: Option<u64>,
343}
344
345#[axum_macros::debug_handler]
347#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
348pub async fn handle_get_services(
349 State(handler): State<JaegerQueryHandlerRef>,
350 Query(query_params): Query<JaegerQueryParams>,
351 Extension(mut query_ctx): Extension<QueryContext>,
352 TraceTableName(table_name): TraceTableName,
353) -> impl IntoResponse {
354 debug!(
355 "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
356 query_params, query_ctx
357 );
358
359 query_ctx.set_channel(Channel::Jaeger);
360 if let Some(table) = table_name {
361 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
362 }
363
364 let query_ctx = Arc::new(query_ctx);
365 let db = query_ctx.get_db_string();
366
367 let _timer = METRIC_JAEGER_QUERY_ELAPSED
369 .with_label_values(&[&db, "/api/services"])
370 .start_timer();
371
372 match handler.get_services(query_ctx).await {
373 Ok(output) => match covert_to_records(output).await {
374 Ok(Some(records)) => match services_from_records(records) {
375 Ok(services) => {
376 let services_num = services.len();
377 (
378 HttpStatusCode::OK,
379 axum::Json(JaegerAPIResponse {
380 data: Some(JaegerData::ServiceNames(services)),
381 total: services_num,
382 ..Default::default()
383 }),
384 )
385 }
386 Err(err) => {
387 error!("Failed to get services: {:?}", err);
388 error_response(err)
389 }
390 },
391 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
392 Err(err) => {
393 error!("Failed to get services: {:?}", err);
394 error_response(err)
395 }
396 },
397 Err(err) => handle_query_error(err, "Failed to get services", &db),
398 }
399}
400
401#[axum_macros::debug_handler]
403#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
404pub async fn handle_get_trace(
405 State(handler): State<JaegerQueryHandlerRef>,
406 Path(trace_id): Path<String>,
407 Query(query_params): Query<JaegerQueryParams>,
408 Extension(mut query_ctx): Extension<QueryContext>,
409 TraceTableName(table_name): TraceTableName,
410) -> impl IntoResponse {
411 debug!(
412 "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
413 trace_id, query_params, query_ctx
414 );
415
416 update_query_context(&mut query_ctx, table_name);
417 let query_ctx = Arc::new(query_ctx);
418 let db = query_ctx.get_db_string();
419
420 let _timer = METRIC_JAEGER_QUERY_ELAPSED
422 .with_label_values(&[&db, "/api/traces"])
423 .start_timer();
424
425 let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
427 let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
428
429 let output = match handler
430 .get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns)
431 .await
432 {
433 Ok(output) => output,
434 Err(err) => {
435 return handle_query_error(
436 err,
437 &format!("Failed to get trace for '{}'", trace_id),
438 &db,
439 );
440 }
441 };
442
443 match covert_to_records(output).await {
444 Ok(Some(records)) => match traces_from_records(records) {
445 Ok(traces) => (
446 HttpStatusCode::OK,
447 axum::Json(JaegerAPIResponse {
448 data: Some(JaegerData::Traces(traces)),
449 ..Default::default()
450 }),
451 ),
452 Err(err) => {
453 error!("Failed to get trace '{}': {:?}", trace_id, err);
454 error_response(err)
455 }
456 },
457 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
458 Err(err) => {
459 error!("Failed to get trace '{}': {:?}", trace_id, err);
460 error_response(err)
461 }
462 }
463}
464
465#[axum_macros::debug_handler]
467#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
468pub async fn handle_find_traces(
469 State(handler): State<JaegerQueryHandlerRef>,
470 Query(query_params): Query<JaegerQueryParams>,
471 Extension(mut query_ctx): Extension<QueryContext>,
472 TraceTableName(table_name): TraceTableName,
473) -> impl IntoResponse {
474 debug!(
475 "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
476 query_params, query_ctx
477 );
478
479 update_query_context(&mut query_ctx, table_name);
480 let query_ctx = Arc::new(query_ctx);
481 let db = query_ctx.get_db_string();
482
483 let _timer = METRIC_JAEGER_QUERY_ELAPSED
485 .with_label_values(&[&db, "/api/traces"])
486 .start_timer();
487
488 match QueryTraceParams::from_jaeger_query_params(query_params) {
489 Ok(query_params) => {
490 let output = handler.find_traces(query_ctx, query_params).await;
491 match output {
492 Ok(output) => match covert_to_records(output).await {
493 Ok(Some(records)) => match traces_from_records(records) {
494 Ok(traces) => (
495 HttpStatusCode::OK,
496 axum::Json(JaegerAPIResponse {
497 data: Some(JaegerData::Traces(traces)),
498 ..Default::default()
499 }),
500 ),
501 Err(err) => {
502 error!("Failed to find traces: {:?}", err);
503 error_response(err)
504 }
505 },
506 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
507 Err(err) => error_response(err),
508 },
509 Err(err) => handle_query_error(err, "Failed to find traces", &db),
510 }
511 }
512 Err(e) => error_response(e),
513 }
514}
515
516#[axum_macros::debug_handler]
518#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
519pub async fn handle_get_operations(
520 State(handler): State<JaegerQueryHandlerRef>,
521 Query(query_params): Query<JaegerQueryParams>,
522 Extension(mut query_ctx): Extension<QueryContext>,
523 TraceTableName(table_name): TraceTableName,
524 headers: HeaderMap,
525) -> impl IntoResponse {
526 debug!(
527 "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
528 query_params, query_ctx, headers
529 );
530
531 let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
532 Ok((start, end)) => (start, end),
533 Err(e) => return error_response(e),
534 };
535
536 debug!("Get operations with start: {:?}, end: {:?}", start, end);
537
538 if let Some(service_name) = &query_params.service_name {
539 update_query_context(&mut query_ctx, table_name);
540 let query_ctx = Arc::new(query_ctx);
541 let db = query_ctx.get_db_string();
542
543 let _timer = METRIC_JAEGER_QUERY_ELAPSED
545 .with_label_values(&[&db, "/api/operations"])
546 .start_timer();
547
548 match handler
549 .get_operations(
550 query_ctx,
551 service_name,
552 query_params.span_kind.as_deref(),
553 start,
554 end,
555 )
556 .await
557 {
558 Ok(output) => match covert_to_records(output).await {
559 Ok(Some(records)) => match operations_from_records(records, true) {
560 Ok(operations) => {
561 let total = operations.len();
562 (
563 HttpStatusCode::OK,
564 axum::Json(JaegerAPIResponse {
565 data: Some(JaegerData::Operations(operations)),
566 total,
567 ..Default::default()
568 }),
569 )
570 }
571 Err(err) => {
572 error!("Failed to get operations: {:?}", err);
573 error_response(err)
574 }
575 },
576 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
577 Err(err) => error_response(err),
578 },
579 Err(err) => handle_query_error(
580 err,
581 &format!("Failed to get operations for service '{}'", service_name),
582 &db,
583 ),
584 }
585 } else {
586 (
587 HttpStatusCode::BAD_REQUEST,
588 axum::Json(JaegerAPIResponse {
589 errors: vec![JaegerAPIError {
590 code: 400,
591 msg: "parameter 'service' is required".to_string(),
592 trace_id: None,
593 }],
594 ..Default::default()
595 }),
596 )
597 }
598}
599
600#[axum_macros::debug_handler]
602#[tracing::instrument(
603 skip_all,
604 fields(protocol = "jaeger", request_type = "get_operations_by_service")
605)]
606pub async fn handle_get_operations_by_service(
607 State(handler): State<JaegerQueryHandlerRef>,
608 Path(service_name): Path<String>,
609 Query(query_params): Query<JaegerQueryParams>,
610 Extension(mut query_ctx): Extension<QueryContext>,
611 TraceTableName(table_name): TraceTableName,
612 headers: HeaderMap,
613) -> impl IntoResponse {
614 debug!(
615 "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
616 service_name, query_params, query_ctx, headers
617 );
618
619 update_query_context(&mut query_ctx, table_name);
620 let query_ctx = Arc::new(query_ctx);
621 let db = query_ctx.get_db_string();
622
623 let _timer = METRIC_JAEGER_QUERY_ELAPSED
625 .with_label_values(&[&db, "/api/services"])
626 .start_timer();
627
628 let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
629 Ok((start, end)) => (start, end),
630 Err(e) => return error_response(e),
631 };
632
633 match handler
634 .get_operations(query_ctx, &service_name, None, start, end)
635 .await
636 {
637 Ok(output) => match covert_to_records(output).await {
638 Ok(Some(records)) => match operations_from_records(records, false) {
639 Ok(operations) => {
640 let operations: Vec<String> =
641 operations.into_iter().map(|op| op.name).collect();
642 let total = operations.len();
643 (
644 HttpStatusCode::OK,
645 axum::Json(JaegerAPIResponse {
646 data: Some(JaegerData::OperationsNames(operations)),
647 total,
648 ..Default::default()
649 }),
650 )
651 }
652 Err(err) => {
653 error!(
654 "Failed to get operations for service '{}': {:?}",
655 service_name, err
656 );
657 error_response(err)
658 }
659 },
660 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
661 Err(err) => error_response(err),
662 },
663 Err(err) => handle_query_error(
664 err,
665 &format!("Failed to get operations for service '{}'", service_name),
666 &db,
667 ),
668 }
669}
670
671async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
672 match output.data {
673 OutputData::Stream(stream) => {
674 let records = HttpRecordsOutput::try_new(
675 stream.schema().clone(),
676 util::collect(stream)
677 .await
678 .context(CollectRecordbatchSnafu)?,
679 )?;
680 debug!("The query records: {:?}", records);
681 Ok(Some(records))
682 }
683 _ => Ok(None),
685 }
686}
687
688fn handle_query_error(
689 err: Error,
690 prompt: &str,
691 db: &str,
692) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
693 if err.status_code() == StatusCode::TableNotFound {
695 warn!(
696 "No trace table '{}' found in database '{}'",
697 TRACE_TABLE_NAME, db
698 );
699 (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
700 } else {
701 error!("{}: {:?}", prompt, err);
702 error_response(err)
703 }
704}
705
706fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
707 (
708 status_code_to_http_status(&err.status_code()),
709 axum::Json(JaegerAPIResponse {
710 errors: vec![JaegerAPIError {
711 code: err.status_code() as i32,
712 msg: err.to_string(),
713 ..Default::default()
714 }],
715 ..Default::default()
716 }),
717 )
718}
719
720fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
721 let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
723 let mut trace_id_to_spans: HashMap<String, Vec<Span>> = HashMap::new();
725 let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
727
728 let is_span_attributes_flatten = !records
729 .schema
730 .column_schemas
731 .iter()
732 .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
733
734 for row in records.rows.into_iter() {
735 let mut span = Span::default();
736 let mut service_name = None;
737 let mut resource_tags = vec![];
738
739 for (idx, cell) in row.into_iter().enumerate() {
740 let column_name = &records.schema.column_schemas[idx].name;
742
743 match column_name.as_str() {
744 TRACE_ID_COLUMN => {
745 if let JsonValue::String(trace_id) = cell {
746 span.trace_id = trace_id.clone();
747 trace_id_to_processes.entry(trace_id).or_default();
748 }
749 }
750 TIMESTAMP_COLUMN => {
751 span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
752 reason: "Failed to convert timestamp to u64".to_string(),
753 })? / 1000;
754 }
755 DURATION_NANO_COLUMN => {
756 span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
757 reason: "Failed to convert duration to u64".to_string(),
758 })? / 1000;
759 }
760 SERVICE_NAME_COLUMN => {
761 if let JsonValue::String(name) = cell {
762 service_name = Some(name);
763 }
764 }
765 SPAN_NAME_COLUMN => {
766 if let JsonValue::String(span_name) = cell {
767 span.operation_name = span_name;
768 }
769 }
770 SPAN_ID_COLUMN => {
771 if let JsonValue::String(span_id) = cell {
772 span.span_id = span_id;
773 }
774 }
775 SPAN_ATTRIBUTES_COLUMN => {
776 if let JsonValue::Object(span_attrs) = cell {
779 span.tags.extend(object_to_tags(span_attrs));
780 }
781 }
782 RESOURCE_ATTRIBUTES_COLUMN => {
783 if let JsonValue::Object(mut resource_attrs) = cell {
787 resource_attrs.remove(KEY_SERVICE_NAME);
788 resource_tags = object_to_tags(resource_attrs);
789 }
790 }
791 PARENT_SPAN_ID_COLUMN => {
792 if let JsonValue::String(parent_span_id) = cell
793 && !parent_span_id.is_empty()
794 {
795 span.references.push(Reference {
796 trace_id: span.trace_id.clone(),
797 span_id: parent_span_id,
798 ref_type: REF_TYPE_CHILD_OF.to_string(),
799 });
800 }
801 }
802 SPAN_EVENTS_COLUMN => {
803 if let JsonValue::Array(events) = cell {
804 for event in events {
805 if let JsonValue::Object(mut obj) = event {
806 let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
807 continue;
808 };
809
810 let Some(t) =
811 obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
812 SPAN_KIND_TIME_FMTS
813 .iter()
814 .find_map(|fmt| {
815 chrono::DateTime::parse_from_str(s, fmt).ok()
816 })
817 .map(|dt| dt.timestamp_micros() as u64)
818 })
819 else {
820 continue;
821 };
822
823 let mut fields = vec![KeyValue {
824 key: "event".to_string(),
825 value_type: ValueType::String,
826 value: Value::String(action.to_string()),
827 }];
828
829 if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
831 fields.extend(object_to_tags(attrs));
832 }
833
834 span.logs.push(Log {
835 timestamp: t,
836 fields,
837 });
838 }
839 }
840 }
841 }
842 SCOPE_NAME_COLUMN => {
843 if let JsonValue::String(scope_name) = cell
844 && !scope_name.is_empty()
845 {
846 span.tags.push(KeyValue {
847 key: KEY_OTEL_SCOPE_NAME.to_string(),
848 value_type: ValueType::String,
849 value: Value::String(scope_name),
850 });
851 }
852 }
853 SCOPE_VERSION_COLUMN => {
854 if let JsonValue::String(scope_version) = cell
855 && !scope_version.is_empty()
856 {
857 span.tags.push(KeyValue {
858 key: KEY_OTEL_SCOPE_VERSION.to_string(),
859 value_type: ValueType::String,
860 value: Value::String(scope_version),
861 });
862 }
863 }
864 SPAN_KIND_COLUMN => {
865 if let JsonValue::String(span_kind) = cell
866 && !span_kind.is_empty()
867 {
868 span.tags.push(KeyValue {
869 key: KEY_SPAN_KIND.to_string(),
870 value_type: ValueType::String,
871 value: Value::String(normalize_span_kind(&span_kind)),
872 });
873 }
874 }
875 SPAN_STATUS_CODE => {
876 if let JsonValue::String(span_status) = cell
877 && span_status != SPAN_STATUS_UNSET
878 && !span_status.is_empty()
879 {
880 span.tags.push(KeyValue {
881 key: KEY_OTEL_STATUS_CODE.to_string(),
882 value_type: ValueType::String,
883 value: Value::String(normalize_status_code(&span_status)),
884 });
885 }
886 }
887
888 _ => {
889 if is_span_attributes_flatten {
891 const SPAN_ATTR_PREFIX: &str = "span_attributes.";
892 const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
893 if column_name.starts_with(SPAN_ATTR_PREFIX) {
895 if let Some(keyvalue) = to_keyvalue(
896 column_name
897 .strip_prefix(SPAN_ATTR_PREFIX)
898 .unwrap_or_default()
899 .to_string(),
900 cell,
901 ) {
902 span.tags.push(keyvalue);
903 }
904 } else if column_name.starts_with(RESOURCE_ATTR_PREFIX)
905 && let Some(keyvalue) = to_keyvalue(
906 column_name
907 .strip_prefix(RESOURCE_ATTR_PREFIX)
908 .unwrap_or_default()
909 .to_string(),
910 cell,
911 )
912 {
913 resource_tags.push(keyvalue);
914 }
915 }
916 }
917 }
918 }
919
920 if let Some(service_name) = service_name {
921 if !service_to_resource_attributes.contains_key(&service_name) {
922 service_to_resource_attributes.insert(service_name.clone(), resource_tags);
923 }
924
925 if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
926 if let Some(process_id) = process.get(&service_name) {
927 span.process_id = process_id.clone();
928 } else {
929 let process_id = format!("p{}", process.len() + 1);
931 process.insert(service_name, process_id.clone());
932 span.process_id = process_id;
933 }
934 }
935 }
936
937 span.tags.sort_by(|a, b| a.key.cmp(&b.key));
939
940 if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
941 spans.push(span);
942 } else {
943 trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
944 }
945 }
946
947 let mut traces = Vec::new();
948 for (trace_id, spans) in trace_id_to_spans {
949 let mut trace = Trace {
950 trace_id,
951 spans,
952 ..Default::default()
953 };
954
955 if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
956 let mut process_id_to_process = HashMap::new();
957 for (service_name, process_id) in processes.into_iter() {
958 let tags = service_to_resource_attributes
959 .remove(&service_name)
960 .unwrap_or_default();
961 process_id_to_process.insert(process_id, Process { service_name, tags });
962 }
963 trace.processes = process_id_to_process;
964 }
965 traces.push(trace);
966 }
967
968 Ok(traces)
969}
970
971fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
972 match value {
973 JsonValue::String(value) => Some(KeyValue {
974 key,
975 value_type: ValueType::String,
976 value: Value::String(value.to_string()),
977 }),
978 JsonValue::Number(value) => Some(KeyValue {
979 key,
980 value_type: ValueType::Int64,
981 value: Value::Int64(value.as_i64().unwrap_or(0)),
982 }),
983 JsonValue::Bool(value) => Some(KeyValue {
984 key,
985 value_type: ValueType::Boolean,
986 value: Value::Boolean(value),
987 }),
988 JsonValue::Array(value) => Some(KeyValue {
989 key,
990 value_type: ValueType::String,
991 value: Value::String(serde_json::to_string(&value).unwrap()),
992 }),
993 JsonValue::Object(value) => Some(KeyValue {
994 key,
995 value_type: ValueType::String,
996 value: Value::String(serde_json::to_string(&value).unwrap()),
997 }),
998 JsonValue::Null => None,
999 }
1000}
1001
1002fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1003 object
1004 .into_iter()
1005 .filter_map(|(key, value)| to_keyvalue(key, value))
1006 .collect()
1007}
1008
1009fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1010 let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1011 check_schema(&records, &expected_schema)?;
1012
1013 let mut services = Vec::with_capacity(records.total_rows);
1014 for row in records.rows.into_iter() {
1015 for value in row.into_iter() {
1016 if let JsonValue::String(service_name) = value {
1017 services.push(service_name);
1018 }
1019 }
1020 }
1021 Ok(services)
1022}
1023
1024fn operations_from_records(
1026 records: HttpRecordsOutput,
1027 contain_span_kind: bool,
1028) -> Result<Vec<Operation>> {
1029 let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1030 check_schema(&records, &expected_schema)?;
1031
1032 let mut operations = Vec::with_capacity(records.total_rows);
1033 for row in records.rows.into_iter() {
1034 let mut row_iter = row.into_iter();
1035 if let Some(JsonValue::String(operation)) = row_iter.next() {
1036 let mut operation = Operation {
1037 name: operation,
1038 span_kind: None,
1039 };
1040 if contain_span_kind {
1041 if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1042 operation.span_kind = Some(normalize_span_kind(&span_kind));
1043 }
1044 } else {
1045 row_iter.next();
1047 }
1048 operations.push(operation);
1049 }
1050 }
1051
1052 Ok(operations)
1053}
1054
1055fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1057 for (i, column) in records.schema.column_schemas.iter().enumerate() {
1058 if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1059 InvalidJaegerQuerySnafu {
1060 reason: "query result schema is not correct".to_string(),
1061 }
1062 .fail()?
1063 }
1064 }
1065 Ok(())
1066}
1067
1068fn normalize_span_kind(span_kind: &str) -> String {
1071 if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1073 stripped.to_lowercase()
1074 } else {
1075 span_kind.to_lowercase()
1077 }
1078}
1079
1080fn normalize_status_code(status_code: &str) -> String {
1083 if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1085 stripped.to_string()
1086 } else {
1087 status_code.to_string()
1089 }
1090}
1091
1092fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1093 if let Some(data) = input.as_str() {
1094 if let Ok(number) = data.parse::<i64>() {
1095 return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1096 }
1097 if let Ok(number) = data.parse::<f64>()
1098 && let Some(number) = serde_json::Number::from_f64(number)
1099 {
1100 return Some(serde_json::Value::Number(number));
1101 }
1102 }
1103
1104 None
1105}
1106
1107fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1108 if let Some(data) = input.as_str() {
1109 if data == "true" {
1110 return Some(serde_json::Value::Bool(true));
1111 }
1112 if data == "false" {
1113 return Some(serde_json::Value::Bool(false));
1114 }
1115 }
1116
1117 None
1118}
1119
1120fn parse_jaeger_time_range_for_operations(
1121 headers: &HeaderMap,
1122 query_params: &JaegerQueryParams,
1123) -> Result<(Option<i64>, Option<i64>)> {
1124 if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
1125 match time_range.to_str() {
1126 Ok(time_range) => match humantime::parse_duration(time_range) {
1127 Ok(duration) => {
1128 debug!(
1129 "Get operations with time range: {:?}, duration: {:?}",
1130 time_range, duration
1131 );
1132 let now = Utc::now().timestamp_micros();
1133 Ok((Some(now - duration.as_micros() as i64), Some(now)))
1134 }
1135 Err(e) => {
1136 error!("Failed to parse time range header: {:?}", e);
1137 Err(InvalidJaegerQuerySnafu {
1138 reason: format!("invalid time range header: {:?}", time_range),
1139 }
1140 .build())
1141 }
1142 },
1143 Err(e) => {
1144 error!("Failed to convert time range header to string: {:?}", e);
1145 Err(InvalidJaegerQuerySnafu {
1146 reason: format!("invalid time range header: {:?}", time_range),
1147 }
1148 .build())
1149 }
1150 }
1151 } else {
1152 Ok((query_params.start, query_params.end))
1153 }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158 use serde_json::{Number, Value as JsonValue, json};
1159
1160 use super::*;
1161 use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1162
1163 #[test]
1164 fn test_services_from_records() {
1165 let tests = vec![(
1167 HttpRecordsOutput {
1168 schema: OutputSchema {
1169 column_schemas: vec![ColumnSchema {
1170 name: "service_name".to_string(),
1171 data_type: "String".to_string(),
1172 }],
1173 },
1174 rows: vec![
1175 vec![JsonValue::String("test-service-0".to_string())],
1176 vec![JsonValue::String("test-service-1".to_string())],
1177 ],
1178 total_rows: 2,
1179 metrics: HashMap::new(),
1180 },
1181 vec!["test-service-0".to_string(), "test-service-1".to_string()],
1182 )];
1183
1184 for (records, expected) in tests {
1185 let services = services_from_records(records).unwrap();
1186 assert_eq!(services, expected);
1187 }
1188 }
1189
1190 #[test]
1191 fn test_operations_from_records() {
1192 let tests = vec![
1194 (
1195 HttpRecordsOutput {
1196 schema: OutputSchema {
1197 column_schemas: vec![
1198 ColumnSchema {
1199 name: "span_name".to_string(),
1200 data_type: "String".to_string(),
1201 },
1202 ColumnSchema {
1203 name: "span_kind".to_string(),
1204 data_type: "String".to_string(),
1205 },
1206 ],
1207 },
1208 rows: vec![
1209 vec![
1210 JsonValue::String("access-mysql".to_string()),
1211 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1212 ],
1213 vec![
1214 JsonValue::String("access-redis".to_string()),
1215 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1216 ],
1217 ],
1218 total_rows: 2,
1219 metrics: HashMap::new(),
1220 },
1221 false,
1222 vec![
1223 Operation {
1224 name: "access-mysql".to_string(),
1225 span_kind: None,
1226 },
1227 Operation {
1228 name: "access-redis".to_string(),
1229 span_kind: None,
1230 },
1231 ],
1232 ),
1233 (
1234 HttpRecordsOutput {
1235 schema: OutputSchema {
1236 column_schemas: vec![
1237 ColumnSchema {
1238 name: "span_name".to_string(),
1239 data_type: "String".to_string(),
1240 },
1241 ColumnSchema {
1242 name: "span_kind".to_string(),
1243 data_type: "String".to_string(),
1244 },
1245 ],
1246 },
1247 rows: vec![
1248 vec![
1249 JsonValue::String("access-mysql".to_string()),
1250 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1251 ],
1252 vec![
1253 JsonValue::String("access-redis".to_string()),
1254 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1255 ],
1256 ],
1257 total_rows: 2,
1258 metrics: HashMap::new(),
1259 },
1260 true,
1261 vec![
1262 Operation {
1263 name: "access-mysql".to_string(),
1264 span_kind: Some("server".to_string()),
1265 },
1266 Operation {
1267 name: "access-redis".to_string(),
1268 span_kind: Some("client".to_string()),
1269 },
1270 ],
1271 ),
1272 ];
1273
1274 for (records, contain_span_kind, expected) in tests {
1275 let operations = operations_from_records(records, contain_span_kind).unwrap();
1276 assert_eq!(operations, expected);
1277 }
1278 }
1279
1280 #[test]
1281 fn test_traces_from_records() {
1282 let tests = vec![(
1284 HttpRecordsOutput {
1285 schema: OutputSchema {
1286 column_schemas: vec![
1287 ColumnSchema {
1288 name: "trace_id".to_string(),
1289 data_type: "String".to_string(),
1290 },
1291 ColumnSchema {
1292 name: "timestamp".to_string(),
1293 data_type: "TimestampNanosecond".to_string(),
1294 },
1295 ColumnSchema {
1296 name: "duration_nano".to_string(),
1297 data_type: "UInt64".to_string(),
1298 },
1299 ColumnSchema {
1300 name: "service_name".to_string(),
1301 data_type: "String".to_string(),
1302 },
1303 ColumnSchema {
1304 name: "span_name".to_string(),
1305 data_type: "String".to_string(),
1306 },
1307 ColumnSchema {
1308 name: "span_id".to_string(),
1309 data_type: "String".to_string(),
1310 },
1311 ColumnSchema {
1312 name: "span_attributes".to_string(),
1313 data_type: "Json".to_string(),
1314 },
1315 ],
1316 },
1317 rows: vec![
1318 vec![
1319 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1320 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1321 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1322 JsonValue::String("test-service-0".to_string()),
1323 JsonValue::String("access-mysql".to_string()),
1324 JsonValue::String("008421dbbd33a3e9".to_string()),
1325 JsonValue::Object(
1326 json!({
1327 "operation.type": "access-mysql",
1328 })
1329 .as_object()
1330 .unwrap()
1331 .clone(),
1332 ),
1333 ],
1334 vec![
1335 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1336 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1337 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1338 JsonValue::String("test-service-0".to_string()),
1339 JsonValue::String("access-redis".to_string()),
1340 JsonValue::String("ffa03416a7b9ea48".to_string()),
1341 JsonValue::Object(
1342 json!({
1343 "operation.type": "access-redis",
1344 })
1345 .as_object()
1346 .unwrap()
1347 .clone(),
1348 ),
1349 ],
1350 ],
1351 total_rows: 2,
1352 metrics: HashMap::new(),
1353 },
1354 vec![Trace {
1355 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1356 spans: vec![
1357 Span {
1358 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1359 span_id: "008421dbbd33a3e9".to_string(),
1360 operation_name: "access-mysql".to_string(),
1361 start_time: 1738726754492422,
1362 duration: 100000,
1363 tags: vec![KeyValue {
1364 key: "operation.type".to_string(),
1365 value_type: ValueType::String,
1366 value: Value::String("access-mysql".to_string()),
1367 }],
1368 process_id: "p1".to_string(),
1369 ..Default::default()
1370 },
1371 Span {
1372 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1373 span_id: "ffa03416a7b9ea48".to_string(),
1374 operation_name: "access-redis".to_string(),
1375 start_time: 1738726754642422,
1376 duration: 100000,
1377 tags: vec![KeyValue {
1378 key: "operation.type".to_string(),
1379 value_type: ValueType::String,
1380 value: Value::String("access-redis".to_string()),
1381 }],
1382 process_id: "p1".to_string(),
1383 ..Default::default()
1384 },
1385 ],
1386 processes: HashMap::from([(
1387 "p1".to_string(),
1388 Process {
1389 service_name: "test-service-0".to_string(),
1390 tags: vec![],
1391 },
1392 )]),
1393 ..Default::default()
1394 }],
1395 )];
1396
1397 for (records, expected) in tests {
1398 let traces = traces_from_records(records).unwrap();
1399 assert_eq!(traces, expected);
1400 }
1401 }
1402
1403 #[test]
1404 fn test_traces_from_v1_records() {
1405 let tests = vec![(
1407 HttpRecordsOutput {
1408 schema: OutputSchema {
1409 column_schemas: vec![
1410 ColumnSchema {
1411 name: "trace_id".to_string(),
1412 data_type: "String".to_string(),
1413 },
1414 ColumnSchema {
1415 name: "timestamp".to_string(),
1416 data_type: "TimestampNanosecond".to_string(),
1417 },
1418 ColumnSchema {
1419 name: "duration_nano".to_string(),
1420 data_type: "UInt64".to_string(),
1421 },
1422 ColumnSchema {
1423 name: "service_name".to_string(),
1424 data_type: "String".to_string(),
1425 },
1426 ColumnSchema {
1427 name: "span_name".to_string(),
1428 data_type: "String".to_string(),
1429 },
1430 ColumnSchema {
1431 name: "span_id".to_string(),
1432 data_type: "String".to_string(),
1433 },
1434 ColumnSchema {
1435 name: "span_attributes.http.request.method".to_string(),
1436 data_type: "String".to_string(),
1437 },
1438 ColumnSchema {
1439 name: "span_attributes.http.request.url".to_string(),
1440 data_type: "String".to_string(),
1441 },
1442 ColumnSchema {
1443 name: "span_attributes.http.status_code".to_string(),
1444 data_type: "UInt64".to_string(),
1445 },
1446 ],
1447 },
1448 rows: vec![
1449 vec![
1450 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1451 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1452 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1453 JsonValue::String("test-service-0".to_string()),
1454 JsonValue::String("access-mysql".to_string()),
1455 JsonValue::String("008421dbbd33a3e9".to_string()),
1456 JsonValue::String("GET".to_string()),
1457 JsonValue::String("/data".to_string()),
1458 JsonValue::Number(Number::from_u128(200).unwrap()),
1459 ],
1460 vec![
1461 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1462 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1463 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1464 JsonValue::String("test-service-0".to_string()),
1465 JsonValue::String("access-redis".to_string()),
1466 JsonValue::String("ffa03416a7b9ea48".to_string()),
1467 JsonValue::String("POST".to_string()),
1468 JsonValue::String("/create".to_string()),
1469 JsonValue::Number(Number::from_u128(400).unwrap()),
1470 ],
1471 ],
1472 total_rows: 2,
1473 metrics: HashMap::new(),
1474 },
1475 vec![Trace {
1476 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1477 spans: vec![
1478 Span {
1479 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1480 span_id: "008421dbbd33a3e9".to_string(),
1481 operation_name: "access-mysql".to_string(),
1482 start_time: 1738726754492422,
1483 duration: 100000,
1484 tags: vec![
1485 KeyValue {
1486 key: "http.request.method".to_string(),
1487 value_type: ValueType::String,
1488 value: Value::String("GET".to_string()),
1489 },
1490 KeyValue {
1491 key: "http.request.url".to_string(),
1492 value_type: ValueType::String,
1493 value: Value::String("/data".to_string()),
1494 },
1495 KeyValue {
1496 key: "http.status_code".to_string(),
1497 value_type: ValueType::Int64,
1498 value: Value::Int64(200),
1499 },
1500 ],
1501 process_id: "p1".to_string(),
1502 ..Default::default()
1503 },
1504 Span {
1505 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1506 span_id: "ffa03416a7b9ea48".to_string(),
1507 operation_name: "access-redis".to_string(),
1508 start_time: 1738726754642422,
1509 duration: 100000,
1510 tags: vec![
1511 KeyValue {
1512 key: "http.request.method".to_string(),
1513 value_type: ValueType::String,
1514 value: Value::String("POST".to_string()),
1515 },
1516 KeyValue {
1517 key: "http.request.url".to_string(),
1518 value_type: ValueType::String,
1519 value: Value::String("/create".to_string()),
1520 },
1521 KeyValue {
1522 key: "http.status_code".to_string(),
1523 value_type: ValueType::Int64,
1524 value: Value::Int64(400),
1525 },
1526 ],
1527 process_id: "p1".to_string(),
1528 ..Default::default()
1529 },
1530 ],
1531 processes: HashMap::from([(
1532 "p1".to_string(),
1533 Process {
1534 service_name: "test-service-0".to_string(),
1535 tags: vec![],
1536 },
1537 )]),
1538 ..Default::default()
1539 }],
1540 )];
1541
1542 for (records, expected) in tests {
1543 let traces = traces_from_records(records).unwrap();
1544 assert_eq!(traces, expected);
1545 }
1546 }
1547
1548 #[test]
1549 fn test_from_jaeger_query_params() {
1550 let tests = vec![
1552 (
1553 JaegerQueryParams {
1554 service_name: Some("test-service-0".to_string()),
1555 ..Default::default()
1556 },
1557 QueryTraceParams {
1558 service_name: "test-service-0".to_string(),
1559 ..Default::default()
1560 },
1561 ),
1562 (
1563 JaegerQueryParams {
1564 service_name: Some("test-service-0".to_string()),
1565 operation_name: Some("access-mysql".to_string()),
1566 start: Some(1738726754492422),
1567 end: Some(1738726754642422),
1568 max_duration: Some("100ms".to_string()),
1569 min_duration: Some("50ms".to_string()),
1570 limit: Some(10),
1571 tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1572 ..Default::default()
1573 },
1574 QueryTraceParams {
1575 service_name: "test-service-0".to_string(),
1576 operation_name: Some("access-mysql".to_string()),
1577 start_time: Some(1738726754492422000),
1578 end_time: Some(1738726754642422000),
1579 min_duration: Some(50000000),
1580 max_duration: Some(100000000),
1581 limit: Some(10),
1582 tags: Some(HashMap::from([
1583 ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1584 ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1585 ("error".to_string(), JsonValue::Bool(false)),
1586 ("http.method".to_string(), JsonValue::String("GET".to_string())),
1587 ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1588 ])),
1589 },
1590 ),
1591 ];
1592
1593 for (query_params, expected) in tests {
1594 let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1595 assert_eq!(query_params, expected);
1596 }
1597 }
1598
1599 #[test]
1600 fn test_check_schema() {
1601 let tests = vec![(
1603 HttpRecordsOutput {
1604 schema: OutputSchema {
1605 column_schemas: vec![
1606 ColumnSchema {
1607 name: "trace_id".to_string(),
1608 data_type: "String".to_string(),
1609 },
1610 ColumnSchema {
1611 name: "timestamp".to_string(),
1612 data_type: "TimestampNanosecond".to_string(),
1613 },
1614 ColumnSchema {
1615 name: "duration_nano".to_string(),
1616 data_type: "UInt64".to_string(),
1617 },
1618 ColumnSchema {
1619 name: "service_name".to_string(),
1620 data_type: "String".to_string(),
1621 },
1622 ColumnSchema {
1623 name: "span_name".to_string(),
1624 data_type: "String".to_string(),
1625 },
1626 ColumnSchema {
1627 name: "span_id".to_string(),
1628 data_type: "String".to_string(),
1629 },
1630 ColumnSchema {
1631 name: "span_attributes".to_string(),
1632 data_type: "Json".to_string(),
1633 },
1634 ],
1635 },
1636 rows: vec![],
1637 total_rows: 0,
1638 metrics: HashMap::new(),
1639 },
1640 vec![
1641 (TRACE_ID_COLUMN, "String"),
1642 (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1643 (DURATION_NANO_COLUMN, "UInt64"),
1644 (SERVICE_NAME_COLUMN, "String"),
1645 (SPAN_NAME_COLUMN, "String"),
1646 (SPAN_ID_COLUMN, "String"),
1647 (SPAN_ATTRIBUTES_COLUMN, "Json"),
1648 ],
1649 true,
1650 )];
1651
1652 for (records, expected_schema, is_ok) in tests {
1653 let result = check_schema(&records, &expected_schema);
1654 assert_eq!(result.is_ok(), is_ok);
1655 }
1656 }
1657
1658 #[test]
1659 fn test_normalize_span_kind() {
1660 let tests = vec![
1661 ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1662 ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1663 ];
1664
1665 for (input, expected) in tests {
1666 let result = normalize_span_kind(&input);
1667 assert_eq!(result, expected);
1668 }
1669 }
1670
1671 #[test]
1672 fn test_convert_string_to_number() {
1673 let tests = vec![
1674 (
1675 JsonValue::String("123".to_string()),
1676 Some(JsonValue::Number(Number::from(123))),
1677 ),
1678 (
1679 JsonValue::String("123.456".to_string()),
1680 Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1681 ),
1682 ];
1683
1684 for (input, expected) in tests {
1685 let result = convert_string_to_number(&input);
1686 assert_eq!(result, expected);
1687 }
1688 }
1689
1690 #[test]
1691 fn test_convert_string_to_boolean() {
1692 let tests = vec![
1693 (
1694 JsonValue::String("true".to_string()),
1695 Some(JsonValue::Bool(true)),
1696 ),
1697 (
1698 JsonValue::String("false".to_string()),
1699 Some(JsonValue::Bool(false)),
1700 ),
1701 ];
1702
1703 for (input, expected) in tests {
1704 let result = convert_string_to_boolean(&input);
1705 assert_eq!(result, expected);
1706 }
1707 }
1708}