1use std::collections::HashMap;
16use std::fmt;
17use std::str::FromStr;
18use std::sync::Arc;
19
20use axum::extract::{Path, Query, State};
21use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
22use axum::response::IntoResponse;
23use axum::Extension;
24use chrono::Utc;
25use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::util;
30use common_telemetry::{debug, error, tracing, warn};
31use serde::{de, Deserialize, Deserializer, Serialize};
32use serde_json::Value as JsonValue;
33use session::context::{Channel, QueryContext};
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37 status_code_to_http_status, CollectRecordbatchSnafu, Error, InvalidJaegerQuerySnafu, Result,
38};
39use crate::http::extractor::TraceTableName;
40use crate::http::HttpRecordsOutput;
41use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
42use crate::otlp::trace::{
43 DURATION_NANO_COLUMN, KEY_OTEL_SCOPE_NAME, KEY_OTEL_SCOPE_VERSION, KEY_OTEL_STATUS_CODE,
44 KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
45 SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
46 SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
47 SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
48};
49use crate::query_handler::JaegerQueryHandlerRef;
50
51pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
52
53const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
54const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
55pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
56
57#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
60pub struct JaegerAPIResponse {
61 pub data: Option<JaegerData>,
62 pub total: usize,
63 pub limit: usize,
64 pub offset: usize,
65 pub errors: Vec<JaegerAPIError>,
66}
67
68#[derive(Debug, Serialize, Deserialize, PartialEq)]
70#[serde(untagged)]
71pub enum JaegerData {
72 ServiceNames(Vec<String>),
73 OperationsNames(Vec<String>),
74 Operations(Vec<Operation>),
75 Traces(Vec<Trace>),
76}
77
78#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
80#[serde(rename_all = "camelCase")]
81pub struct JaegerAPIError {
82 pub code: i32,
83 pub msg: String,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub trace_id: Option<String>,
86}
87
88#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
90#[serde(rename_all = "camelCase")]
91pub struct Operation {
92 pub name: String,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub span_kind: Option<String>,
95}
96
97#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
99#[serde(rename_all = "camelCase")]
100pub struct Trace {
101 #[serde(rename = "traceID")]
102 pub trace_id: String,
103 pub spans: Vec<Span>,
104
105 #[serde(skip_serializing_if = "HashMap::is_empty")]
106 pub processes: HashMap<String, Process>,
107
108 #[serde(skip_serializing_if = "Vec::is_empty")]
109 pub warnings: Vec<String>,
110}
111
112#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub struct Span {
116 #[serde(rename = "traceID")]
117 pub trace_id: String,
118
119 #[serde(rename = "spanID")]
120 pub span_id: String,
121
122 #[serde(rename = "parentSpanID")]
123 #[serde(skip_serializing_if = "String::is_empty")]
124 pub parent_span_id: String,
125
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub flags: Option<u32>,
128
129 pub operation_name: String,
130 pub references: Vec<Reference>,
131 pub start_time: u64, pub duration: u64, pub tags: Vec<KeyValue>,
134 pub logs: Vec<Log>,
135
136 #[serde(rename = "processID")]
137 #[serde(skip_serializing_if = "String::is_empty")]
138 pub process_id: String,
139
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub process: Option<Process>,
142
143 #[serde(skip_serializing_if = "Vec::is_empty")]
144 pub warnings: Vec<String>,
145}
146
147#[derive(Debug, Serialize, Deserialize, PartialEq)]
149#[serde(rename_all = "camelCase")]
150pub struct Reference {
151 #[serde(rename = "traceID")]
152 pub trace_id: String,
153 #[serde(rename = "spanID")]
154 pub span_id: String,
155 pub ref_type: String,
156}
157
158#[derive(Debug, Serialize, Deserialize, PartialEq)]
160#[serde(rename_all = "camelCase")]
161pub struct Process {
162 pub service_name: String,
163 pub tags: Vec<KeyValue>,
164}
165
166#[derive(Debug, Serialize, Deserialize, PartialEq)]
168#[serde(rename_all = "camelCase")]
169pub struct Log {
170 pub timestamp: u64,
171 pub fields: Vec<KeyValue>,
172}
173
174#[derive(Debug, Serialize, Deserialize, PartialEq)]
176#[serde(rename_all = "camelCase")]
177pub struct KeyValue {
178 pub key: String,
179 #[serde(rename = "type")]
180 pub value_type: ValueType,
181 pub value: Value,
182}
183
184#[derive(Debug, Serialize, Deserialize, PartialEq)]
186#[serde(untagged)]
187#[serde(rename_all = "camelCase")]
188pub enum Value {
189 String(String),
190 Int64(i64),
191 Float64(f64),
192 Boolean(bool),
193 Binary(Vec<u8>),
194}
195
196#[derive(Debug, Serialize, Deserialize, PartialEq)]
198#[serde(rename_all = "lowercase")]
199pub enum ValueType {
200 String,
201 Int64,
202 Float64,
203 Boolean,
204 Binary,
205}
206
207#[derive(Default, Debug, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub struct JaegerQueryParams {
211 #[serde(rename = "service")]
213 pub service_name: Option<String>,
214
215 #[serde(rename = "operation")]
217 pub operation_name: Option<String>,
218
219 #[serde(default, deserialize_with = "empty_string_as_none")]
221 pub limit: Option<usize>,
222
223 pub start: Option<i64>,
225
226 pub end: Option<i64>,
228
229 #[serde(default, deserialize_with = "empty_string_as_none")]
231 pub max_duration: Option<String>,
232
233 #[serde(default, deserialize_with = "empty_string_as_none")]
235 pub min_duration: Option<String>,
236
237 pub tags: Option<String>,
241
242 pub span_kind: Option<String>,
244}
245
246fn empty_string_as_none<'de, D, T>(de: D) -> Result<Option<T>, D::Error>
248where
249 D: Deserializer<'de>,
250 T: FromStr,
251 T::Err: fmt::Display,
252{
253 let opt = Option::<String>::deserialize(de)?;
254 match opt.as_deref() {
255 None | Some("") => Ok(None),
256 Some(s) => FromStr::from_str(s).map_err(de::Error::custom).map(Some),
257 }
258}
259
260fn update_query_context(query_ctx: &mut QueryContext, table_name: Option<String>) {
261 query_ctx.set_channel(Channel::Jaeger);
263 if let Some(table) = table_name {
264 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
265 }
266}
267
268impl QueryTraceParams {
269 fn from_jaeger_query_params(query_params: JaegerQueryParams) -> Result<Self> {
270 let mut internal_query_params: QueryTraceParams = QueryTraceParams {
271 service_name: query_params.service_name.context(InvalidJaegerQuerySnafu {
272 reason: "service_name is required".to_string(),
273 })?,
274 operation_name: query_params.operation_name,
275 start_time: query_params.start.map(|start| start * 1000),
277 end_time: query_params.end.map(|end| end * 1000),
278 ..Default::default()
279 };
280
281 if let Some(max_duration) = query_params.max_duration {
282 let duration = humantime::parse_duration(&max_duration).map_err(|e| {
283 InvalidJaegerQuerySnafu {
284 reason: format!("parse maxDuration '{}' failed: {}", max_duration, e),
285 }
286 .build()
287 })?;
288 internal_query_params.max_duration = Some(duration.as_nanos() as u64);
289 }
290
291 if let Some(min_duration) = query_params.min_duration {
292 let duration = humantime::parse_duration(&min_duration).map_err(|e| {
293 InvalidJaegerQuerySnafu {
294 reason: format!("parse minDuration '{}' failed: {}", min_duration, e),
295 }
296 .build()
297 })?;
298 internal_query_params.min_duration = Some(duration.as_nanos() as u64);
299 }
300
301 if let Some(tags) = query_params.tags {
302 let mut tags_map: HashMap<String, JsonValue> =
304 serde_json::from_str(&tags).map_err(|e| {
305 InvalidJaegerQuerySnafu {
306 reason: format!("parse tags '{}' failed: {}", tags, e),
307 }
308 .build()
309 })?;
310 for (_, v) in tags_map.iter_mut() {
311 if let Some(number) = convert_string_to_number(v) {
312 *v = number;
313 }
314 if let Some(boolean) = convert_string_to_boolean(v) {
315 *v = boolean;
316 }
317 }
318 internal_query_params.tags = Some(tags_map);
319 }
320
321 internal_query_params.limit = query_params.limit;
322
323 Ok(internal_query_params)
324 }
325}
326
327#[derive(Debug, Default, PartialEq)]
328pub struct QueryTraceParams {
329 pub service_name: String,
330 pub operation_name: Option<String>,
331
332 pub limit: Option<usize>,
334
335 pub tags: Option<HashMap<String, JsonValue>>,
337
338 pub start_time: Option<i64>,
340 pub end_time: Option<i64>,
341 pub min_duration: Option<u64>,
342 pub max_duration: Option<u64>,
343}
344
345#[axum_macros::debug_handler]
347#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_services"))]
348pub async fn handle_get_services(
349 State(handler): State<JaegerQueryHandlerRef>,
350 Query(query_params): Query<JaegerQueryParams>,
351 Extension(mut query_ctx): Extension<QueryContext>,
352 TraceTableName(table_name): TraceTableName,
353) -> impl IntoResponse {
354 debug!(
355 "Received Jaeger '/api/services' request, query_params: {:?}, query_ctx: {:?}",
356 query_params, query_ctx
357 );
358
359 query_ctx.set_channel(Channel::Jaeger);
360 if let Some(table) = table_name {
361 query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
362 }
363
364 let query_ctx = Arc::new(query_ctx);
365 let db = query_ctx.get_db_string();
366
367 let _timer = METRIC_JAEGER_QUERY_ELAPSED
369 .with_label_values(&[&db, "/api/services"])
370 .start_timer();
371
372 match handler.get_services(query_ctx).await {
373 Ok(output) => match covert_to_records(output).await {
374 Ok(Some(records)) => match services_from_records(records) {
375 Ok(services) => {
376 let services_num = services.len();
377 (
378 HttpStatusCode::OK,
379 axum::Json(JaegerAPIResponse {
380 data: Some(JaegerData::ServiceNames(services)),
381 total: services_num,
382 ..Default::default()
383 }),
384 )
385 }
386 Err(err) => {
387 error!("Failed to get services: {:?}", err);
388 error_response(err)
389 }
390 },
391 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
392 Err(err) => {
393 error!("Failed to get services: {:?}", err);
394 error_response(err)
395 }
396 },
397 Err(err) => handle_query_error(err, "Failed to get services", &db),
398 }
399}
400
401#[axum_macros::debug_handler]
403#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_trace"))]
404pub async fn handle_get_trace(
405 State(handler): State<JaegerQueryHandlerRef>,
406 Path(trace_id): Path<String>,
407 Query(query_params): Query<JaegerQueryParams>,
408 Extension(mut query_ctx): Extension<QueryContext>,
409 TraceTableName(table_name): TraceTableName,
410) -> impl IntoResponse {
411 debug!(
412 "Received Jaeger '/api/traces/{}' request, query_params: {:?}, query_ctx: {:?}",
413 trace_id, query_params, query_ctx
414 );
415
416 update_query_context(&mut query_ctx, table_name);
417 let query_ctx = Arc::new(query_ctx);
418 let db = query_ctx.get_db_string();
419
420 let _timer = METRIC_JAEGER_QUERY_ELAPSED
422 .with_label_values(&[&db, "/api/traces"])
423 .start_timer();
424
425 let start_time_ns = query_params.start.map(|start_us| start_us * 1000);
427 let end_time_ns = query_params.end.map(|end_us| end_us * 1000);
428
429 let output = match handler
430 .get_trace(query_ctx, &trace_id, start_time_ns, end_time_ns)
431 .await
432 {
433 Ok(output) => output,
434 Err(err) => {
435 return handle_query_error(
436 err,
437 &format!("Failed to get trace for '{}'", trace_id),
438 &db,
439 );
440 }
441 };
442
443 match covert_to_records(output).await {
444 Ok(Some(records)) => match traces_from_records(records) {
445 Ok(traces) => (
446 HttpStatusCode::OK,
447 axum::Json(JaegerAPIResponse {
448 data: Some(JaegerData::Traces(traces)),
449 ..Default::default()
450 }),
451 ),
452 Err(err) => {
453 error!("Failed to get trace '{}': {:?}", trace_id, err);
454 error_response(err)
455 }
456 },
457 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
458 Err(err) => {
459 error!("Failed to get trace '{}': {:?}", trace_id, err);
460 error_response(err)
461 }
462 }
463}
464
465#[axum_macros::debug_handler]
467#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "find_traces"))]
468pub async fn handle_find_traces(
469 State(handler): State<JaegerQueryHandlerRef>,
470 Query(query_params): Query<JaegerQueryParams>,
471 Extension(mut query_ctx): Extension<QueryContext>,
472 TraceTableName(table_name): TraceTableName,
473) -> impl IntoResponse {
474 debug!(
475 "Received Jaeger '/api/traces' request, query_params: {:?}, query_ctx: {:?}",
476 query_params, query_ctx
477 );
478
479 update_query_context(&mut query_ctx, table_name);
480 let query_ctx = Arc::new(query_ctx);
481 let db = query_ctx.get_db_string();
482
483 let _timer = METRIC_JAEGER_QUERY_ELAPSED
485 .with_label_values(&[&db, "/api/traces"])
486 .start_timer();
487
488 match QueryTraceParams::from_jaeger_query_params(query_params) {
489 Ok(query_params) => {
490 let output = handler.find_traces(query_ctx, query_params).await;
491 match output {
492 Ok(output) => match covert_to_records(output).await {
493 Ok(Some(records)) => match traces_from_records(records) {
494 Ok(traces) => (
495 HttpStatusCode::OK,
496 axum::Json(JaegerAPIResponse {
497 data: Some(JaegerData::Traces(traces)),
498 ..Default::default()
499 }),
500 ),
501 Err(err) => {
502 error!("Failed to find traces: {:?}", err);
503 error_response(err)
504 }
505 },
506 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
507 Err(err) => error_response(err),
508 },
509 Err(err) => handle_query_error(err, "Failed to find traces", &db),
510 }
511 }
512 Err(e) => error_response(e),
513 }
514}
515
516#[axum_macros::debug_handler]
518#[tracing::instrument(skip_all, fields(protocol = "jaeger", request_type = "get_operations"))]
519pub async fn handle_get_operations(
520 State(handler): State<JaegerQueryHandlerRef>,
521 Query(query_params): Query<JaegerQueryParams>,
522 Extension(mut query_ctx): Extension<QueryContext>,
523 TraceTableName(table_name): TraceTableName,
524 headers: HeaderMap,
525) -> impl IntoResponse {
526 debug!(
527 "Received Jaeger '/api/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
528 query_params, query_ctx, headers
529 );
530
531 let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
532 Ok((start, end)) => (start, end),
533 Err(e) => return error_response(e),
534 };
535
536 debug!("Get operations with start: {:?}, end: {:?}", start, end);
537
538 if let Some(service_name) = &query_params.service_name {
539 update_query_context(&mut query_ctx, table_name);
540 let query_ctx = Arc::new(query_ctx);
541 let db = query_ctx.get_db_string();
542
543 let _timer = METRIC_JAEGER_QUERY_ELAPSED
545 .with_label_values(&[&db, "/api/operations"])
546 .start_timer();
547
548 match handler
549 .get_operations(
550 query_ctx,
551 service_name,
552 query_params.span_kind.as_deref(),
553 start,
554 end,
555 )
556 .await
557 {
558 Ok(output) => match covert_to_records(output).await {
559 Ok(Some(records)) => match operations_from_records(records, true) {
560 Ok(operations) => {
561 let total = operations.len();
562 (
563 HttpStatusCode::OK,
564 axum::Json(JaegerAPIResponse {
565 data: Some(JaegerData::Operations(operations)),
566 total,
567 ..Default::default()
568 }),
569 )
570 }
571 Err(err) => {
572 error!("Failed to get operations: {:?}", err);
573 error_response(err)
574 }
575 },
576 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
577 Err(err) => error_response(err),
578 },
579 Err(err) => handle_query_error(
580 err,
581 &format!("Failed to get operations for service '{}'", service_name),
582 &db,
583 ),
584 }
585 } else {
586 (
587 HttpStatusCode::BAD_REQUEST,
588 axum::Json(JaegerAPIResponse {
589 errors: vec![JaegerAPIError {
590 code: 400,
591 msg: "parameter 'service' is required".to_string(),
592 trace_id: None,
593 }],
594 ..Default::default()
595 }),
596 )
597 }
598}
599
600#[axum_macros::debug_handler]
602#[tracing::instrument(
603 skip_all,
604 fields(protocol = "jaeger", request_type = "get_operations_by_service")
605)]
606pub async fn handle_get_operations_by_service(
607 State(handler): State<JaegerQueryHandlerRef>,
608 Path(service_name): Path<String>,
609 Query(query_params): Query<JaegerQueryParams>,
610 Extension(mut query_ctx): Extension<QueryContext>,
611 TraceTableName(table_name): TraceTableName,
612 headers: HeaderMap,
613) -> impl IntoResponse {
614 debug!(
615 "Received Jaeger '/api/services/{}/operations' request, query_params: {:?}, query_ctx: {:?}, headers: {:?}",
616 service_name, query_params, query_ctx, headers
617 );
618
619 update_query_context(&mut query_ctx, table_name);
620 let query_ctx = Arc::new(query_ctx);
621 let db = query_ctx.get_db_string();
622
623 let _timer = METRIC_JAEGER_QUERY_ELAPSED
625 .with_label_values(&[&db, "/api/services"])
626 .start_timer();
627
628 let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
629 Ok((start, end)) => (start, end),
630 Err(e) => return error_response(e),
631 };
632
633 match handler
634 .get_operations(query_ctx, &service_name, None, start, end)
635 .await
636 {
637 Ok(output) => match covert_to_records(output).await {
638 Ok(Some(records)) => match operations_from_records(records, false) {
639 Ok(operations) => {
640 let operations: Vec<String> =
641 operations.into_iter().map(|op| op.name).collect();
642 let total = operations.len();
643 (
644 HttpStatusCode::OK,
645 axum::Json(JaegerAPIResponse {
646 data: Some(JaegerData::OperationsNames(operations)),
647 total,
648 ..Default::default()
649 }),
650 )
651 }
652 Err(err) => {
653 error!(
654 "Failed to get operations for service '{}': {:?}",
655 service_name, err
656 );
657 error_response(err)
658 }
659 },
660 Ok(None) => (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default())),
661 Err(err) => error_response(err),
662 },
663 Err(err) => handle_query_error(
664 err,
665 &format!("Failed to get operations for service '{}'", service_name),
666 &db,
667 ),
668 }
669}
670
671async fn covert_to_records(output: Output) -> Result<Option<HttpRecordsOutput>> {
672 match output.data {
673 OutputData::Stream(stream) => {
674 let records = HttpRecordsOutput::try_new(
675 stream.schema().clone(),
676 util::collect(stream)
677 .await
678 .context(CollectRecordbatchSnafu)?,
679 )?;
680 debug!("The query records: {:?}", records);
681 Ok(Some(records))
682 }
683 _ => Ok(None),
685 }
686}
687
688fn handle_query_error(
689 err: Error,
690 prompt: &str,
691 db: &str,
692) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
693 if err.status_code() == StatusCode::TableNotFound {
695 warn!(
696 "No trace table '{}' found in database '{}'",
697 TRACE_TABLE_NAME, db
698 );
699 (HttpStatusCode::OK, axum::Json(JaegerAPIResponse::default()))
700 } else {
701 error!("{}: {:?}", prompt, err);
702 error_response(err)
703 }
704}
705
706fn error_response(err: Error) -> (HttpStatusCode, axum::Json<JaegerAPIResponse>) {
707 (
708 status_code_to_http_status(&err.status_code()),
709 axum::Json(JaegerAPIResponse {
710 errors: vec![JaegerAPIError {
711 code: err.status_code() as i32,
712 msg: err.to_string(),
713 ..Default::default()
714 }],
715 ..Default::default()
716 }),
717 )
718}
719
720fn traces_from_records(records: HttpRecordsOutput) -> Result<Vec<Trace>> {
721 let mut trace_id_to_processes: HashMap<String, HashMap<String, String>> = HashMap::new();
723 let mut trace_id_to_spans: HashMap<String, Vec<Span>> = HashMap::new();
725 let mut service_to_resource_attributes: HashMap<String, Vec<KeyValue>> = HashMap::new();
727
728 let is_span_attributes_flatten = !records
729 .schema
730 .column_schemas
731 .iter()
732 .any(|c| c.name == SPAN_ATTRIBUTES_COLUMN);
733
734 for row in records.rows.into_iter() {
735 let mut span = Span::default();
736 let mut service_name = None;
737 let mut resource_tags = vec![];
738
739 for (idx, cell) in row.into_iter().enumerate() {
740 let column_name = &records.schema.column_schemas[idx].name;
742
743 match column_name.as_str() {
744 TRACE_ID_COLUMN => {
745 if let JsonValue::String(trace_id) = cell {
746 span.trace_id = trace_id.clone();
747 trace_id_to_processes.entry(trace_id).or_default();
748 }
749 }
750 TIMESTAMP_COLUMN => {
751 span.start_time = cell.as_u64().context(InvalidJaegerQuerySnafu {
752 reason: "Failed to convert timestamp to u64".to_string(),
753 })? / 1000;
754 }
755 DURATION_NANO_COLUMN => {
756 span.duration = cell.as_u64().context(InvalidJaegerQuerySnafu {
757 reason: "Failed to convert duration to u64".to_string(),
758 })? / 1000;
759 }
760 SERVICE_NAME_COLUMN => {
761 if let JsonValue::String(name) = cell {
762 service_name = Some(name);
763 }
764 }
765 SPAN_NAME_COLUMN => {
766 if let JsonValue::String(span_name) = cell {
767 span.operation_name = span_name;
768 }
769 }
770 SPAN_ID_COLUMN => {
771 if let JsonValue::String(span_id) = cell {
772 span.span_id = span_id;
773 }
774 }
775 SPAN_ATTRIBUTES_COLUMN => {
776 if let JsonValue::Object(span_attrs) = cell {
779 span.tags.extend(object_to_tags(span_attrs));
780 }
781 }
782 RESOURCE_ATTRIBUTES_COLUMN => {
783 if let JsonValue::Object(mut resource_attrs) = cell {
787 resource_attrs.remove(KEY_SERVICE_NAME);
788 resource_tags = object_to_tags(resource_attrs);
789 }
790 }
791 PARENT_SPAN_ID_COLUMN => {
792 if let JsonValue::String(parent_span_id) = cell {
793 if !parent_span_id.is_empty() {
794 span.references.push(Reference {
795 trace_id: span.trace_id.clone(),
796 span_id: parent_span_id,
797 ref_type: REF_TYPE_CHILD_OF.to_string(),
798 });
799 }
800 }
801 }
802 SPAN_EVENTS_COLUMN => {
803 if let JsonValue::Array(events) = cell {
804 for event in events {
805 if let JsonValue::Object(mut obj) = event {
806 let Some(action) = obj.get("name").and_then(|v| v.as_str()) else {
807 continue;
808 };
809
810 let Some(t) =
811 obj.get("time").and_then(|t| t.as_str()).and_then(|s| {
812 SPAN_KIND_TIME_FMTS
813 .iter()
814 .find_map(|fmt| {
815 chrono::DateTime::parse_from_str(s, fmt).ok()
816 })
817 .map(|dt| dt.timestamp_micros() as u64)
818 })
819 else {
820 continue;
821 };
822
823 let mut fields = vec![KeyValue {
824 key: "event".to_string(),
825 value_type: ValueType::String,
826 value: Value::String(action.to_string()),
827 }];
828
829 if let Some(JsonValue::Object(attrs)) = obj.remove("attributes") {
831 fields.extend(object_to_tags(attrs));
832 }
833
834 span.logs.push(Log {
835 timestamp: t,
836 fields,
837 });
838 }
839 }
840 }
841 }
842 SCOPE_NAME_COLUMN => {
843 if let JsonValue::String(scope_name) = cell {
844 if !scope_name.is_empty() {
845 span.tags.push(KeyValue {
846 key: KEY_OTEL_SCOPE_NAME.to_string(),
847 value_type: ValueType::String,
848 value: Value::String(scope_name),
849 });
850 }
851 }
852 }
853 SCOPE_VERSION_COLUMN => {
854 if let JsonValue::String(scope_version) = cell {
855 if !scope_version.is_empty() {
856 span.tags.push(KeyValue {
857 key: KEY_OTEL_SCOPE_VERSION.to_string(),
858 value_type: ValueType::String,
859 value: Value::String(scope_version),
860 });
861 }
862 }
863 }
864 SPAN_KIND_COLUMN => {
865 if let JsonValue::String(span_kind) = cell {
866 if !span_kind.is_empty() {
867 span.tags.push(KeyValue {
868 key: KEY_SPAN_KIND.to_string(),
869 value_type: ValueType::String,
870 value: Value::String(normalize_span_kind(&span_kind)),
871 });
872 }
873 }
874 }
875 SPAN_STATUS_CODE => {
876 if let JsonValue::String(span_status) = cell {
877 if span_status != SPAN_STATUS_UNSET && !span_status.is_empty() {
878 span.tags.push(KeyValue {
879 key: KEY_OTEL_STATUS_CODE.to_string(),
880 value_type: ValueType::String,
881 value: Value::String(normalize_status_code(&span_status)),
882 });
883 }
884 }
885 }
886
887 _ => {
888 if is_span_attributes_flatten {
890 const SPAN_ATTR_PREFIX: &str = "span_attributes.";
891 const RESOURCE_ATTR_PREFIX: &str = "resource_attributes.";
892 if column_name.starts_with(SPAN_ATTR_PREFIX) {
894 if let Some(keyvalue) = to_keyvalue(
895 column_name
896 .strip_prefix(SPAN_ATTR_PREFIX)
897 .unwrap_or_default()
898 .to_string(),
899 cell,
900 ) {
901 span.tags.push(keyvalue);
902 }
903 } else if column_name.starts_with(RESOURCE_ATTR_PREFIX) {
904 if let Some(keyvalue) = to_keyvalue(
905 column_name
906 .strip_prefix(RESOURCE_ATTR_PREFIX)
907 .unwrap_or_default()
908 .to_string(),
909 cell,
910 ) {
911 resource_tags.push(keyvalue);
912 }
913 }
914 }
915 }
916 }
917 }
918
919 if let Some(service_name) = service_name {
920 if !service_to_resource_attributes.contains_key(&service_name) {
921 service_to_resource_attributes.insert(service_name.clone(), resource_tags);
922 }
923
924 if let Some(process) = trace_id_to_processes.get_mut(&span.trace_id) {
925 if let Some(process_id) = process.get(&service_name) {
926 span.process_id = process_id.clone();
927 } else {
928 let process_id = format!("p{}", process.len() + 1);
930 process.insert(service_name, process_id.clone());
931 span.process_id = process_id;
932 }
933 }
934 }
935
936 span.tags.sort_by(|a, b| a.key.cmp(&b.key));
938
939 if let Some(spans) = trace_id_to_spans.get_mut(&span.trace_id) {
940 spans.push(span);
941 } else {
942 trace_id_to_spans.insert(span.trace_id.clone(), vec![span]);
943 }
944 }
945
946 let mut traces = Vec::new();
947 for (trace_id, spans) in trace_id_to_spans {
948 let mut trace = Trace {
949 trace_id,
950 spans,
951 ..Default::default()
952 };
953
954 if let Some(processes) = trace_id_to_processes.remove(&trace.trace_id) {
955 let mut process_id_to_process = HashMap::new();
956 for (service_name, process_id) in processes.into_iter() {
957 let tags = service_to_resource_attributes
958 .remove(&service_name)
959 .unwrap_or_default();
960 process_id_to_process.insert(process_id, Process { service_name, tags });
961 }
962 trace.processes = process_id_to_process;
963 }
964 traces.push(trace);
965 }
966
967 Ok(traces)
968}
969
970fn to_keyvalue(key: String, value: JsonValue) -> Option<KeyValue> {
971 match value {
972 JsonValue::String(value) => Some(KeyValue {
973 key,
974 value_type: ValueType::String,
975 value: Value::String(value.to_string()),
976 }),
977 JsonValue::Number(value) => Some(KeyValue {
978 key,
979 value_type: ValueType::Int64,
980 value: Value::Int64(value.as_i64().unwrap_or(0)),
981 }),
982 JsonValue::Bool(value) => Some(KeyValue {
983 key,
984 value_type: ValueType::Boolean,
985 value: Value::Boolean(value),
986 }),
987 JsonValue::Array(value) => Some(KeyValue {
988 key,
989 value_type: ValueType::String,
990 value: Value::String(serde_json::to_string(&value).unwrap()),
991 }),
992 JsonValue::Object(value) => Some(KeyValue {
993 key,
994 value_type: ValueType::String,
995 value: Value::String(serde_json::to_string(&value).unwrap()),
996 }),
997 JsonValue::Null => None,
998 }
999}
1000
1001fn object_to_tags(object: serde_json::map::Map<String, JsonValue>) -> Vec<KeyValue> {
1002 object
1003 .into_iter()
1004 .filter_map(|(key, value)| to_keyvalue(key, value))
1005 .collect()
1006}
1007
1008fn services_from_records(records: HttpRecordsOutput) -> Result<Vec<String>> {
1009 let expected_schema = vec![(SERVICE_NAME_COLUMN, "String")];
1010 check_schema(&records, &expected_schema)?;
1011
1012 let mut services = Vec::with_capacity(records.total_rows);
1013 for row in records.rows.into_iter() {
1014 for value in row.into_iter() {
1015 if let JsonValue::String(service_name) = value {
1016 services.push(service_name);
1017 }
1018 }
1019 }
1020 Ok(services)
1021}
1022
1023fn operations_from_records(
1025 records: HttpRecordsOutput,
1026 contain_span_kind: bool,
1027) -> Result<Vec<Operation>> {
1028 let expected_schema = vec![(SPAN_NAME_COLUMN, "String"), (SPAN_KIND_COLUMN, "String")];
1029 check_schema(&records, &expected_schema)?;
1030
1031 let mut operations = Vec::with_capacity(records.total_rows);
1032 for row in records.rows.into_iter() {
1033 let mut row_iter = row.into_iter();
1034 if let Some(JsonValue::String(operation)) = row_iter.next() {
1035 let mut operation = Operation {
1036 name: operation,
1037 span_kind: None,
1038 };
1039 if contain_span_kind {
1040 if let Some(JsonValue::String(span_kind)) = row_iter.next() {
1041 operation.span_kind = Some(normalize_span_kind(&span_kind));
1042 }
1043 } else {
1044 row_iter.next();
1046 }
1047 operations.push(operation);
1048 }
1049 }
1050
1051 Ok(operations)
1052}
1053
1054fn check_schema(records: &HttpRecordsOutput, expected_schema: &[(&str, &str)]) -> Result<()> {
1056 for (i, column) in records.schema.column_schemas.iter().enumerate() {
1057 if column.name != expected_schema[i].0 || column.data_type != expected_schema[i].1 {
1058 InvalidJaegerQuerySnafu {
1059 reason: "query result schema is not correct".to_string(),
1060 }
1061 .fail()?
1062 }
1063 }
1064 Ok(())
1065}
1066
1067fn normalize_span_kind(span_kind: &str) -> String {
1070 if let Some(stripped) = span_kind.strip_prefix(SPAN_KIND_PREFIX) {
1072 stripped.to_lowercase()
1073 } else {
1074 span_kind.to_lowercase()
1076 }
1077}
1078
1079fn normalize_status_code(status_code: &str) -> String {
1082 if let Some(stripped) = status_code.strip_prefix(SPAN_STATUS_PREFIX) {
1084 stripped.to_string()
1085 } else {
1086 status_code.to_string()
1088 }
1089}
1090
1091fn convert_string_to_number(input: &serde_json::Value) -> Option<serde_json::Value> {
1092 if let Some(data) = input.as_str() {
1093 if let Ok(number) = data.parse::<i64>() {
1094 return Some(serde_json::Value::Number(serde_json::Number::from(number)));
1095 }
1096 if let Ok(number) = data.parse::<f64>() {
1097 if let Some(number) = serde_json::Number::from_f64(number) {
1098 return Some(serde_json::Value::Number(number));
1099 }
1100 }
1101 }
1102
1103 None
1104}
1105
1106fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Value> {
1107 if let Some(data) = input.as_str() {
1108 if data == "true" {
1109 return Some(serde_json::Value::Bool(true));
1110 }
1111 if data == "false" {
1112 return Some(serde_json::Value::Bool(false));
1113 }
1114 }
1115
1116 None
1117}
1118
1119fn parse_jaeger_time_range_for_operations(
1120 headers: &HeaderMap,
1121 query_params: &JaegerQueryParams,
1122) -> Result<(Option<i64>, Option<i64>)> {
1123 if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
1124 match time_range.to_str() {
1125 Ok(time_range) => match humantime::parse_duration(time_range) {
1126 Ok(duration) => {
1127 debug!(
1128 "Get operations with time range: {:?}, duration: {:?}",
1129 time_range, duration
1130 );
1131 let now = Utc::now().timestamp_micros();
1132 Ok((Some(now - duration.as_micros() as i64), Some(now)))
1133 }
1134 Err(e) => {
1135 error!("Failed to parse time range header: {:?}", e);
1136 Err(InvalidJaegerQuerySnafu {
1137 reason: format!("invalid time range header: {:?}", time_range),
1138 }
1139 .build())
1140 }
1141 },
1142 Err(e) => {
1143 error!("Failed to convert time range header to string: {:?}", e);
1144 Err(InvalidJaegerQuerySnafu {
1145 reason: format!("invalid time range header: {:?}", time_range),
1146 }
1147 .build())
1148 }
1149 }
1150 } else {
1151 Ok((query_params.start, query_params.end))
1152 }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157 use serde_json::{json, Number, Value as JsonValue};
1158
1159 use super::*;
1160 use crate::http::{ColumnSchema, HttpRecordsOutput, OutputSchema};
1161
1162 #[test]
1163 fn test_services_from_records() {
1164 let tests = vec![(
1166 HttpRecordsOutput {
1167 schema: OutputSchema {
1168 column_schemas: vec![ColumnSchema {
1169 name: "service_name".to_string(),
1170 data_type: "String".to_string(),
1171 }],
1172 },
1173 rows: vec![
1174 vec![JsonValue::String("test-service-0".to_string())],
1175 vec![JsonValue::String("test-service-1".to_string())],
1176 ],
1177 total_rows: 2,
1178 metrics: HashMap::new(),
1179 },
1180 vec!["test-service-0".to_string(), "test-service-1".to_string()],
1181 )];
1182
1183 for (records, expected) in tests {
1184 let services = services_from_records(records).unwrap();
1185 assert_eq!(services, expected);
1186 }
1187 }
1188
1189 #[test]
1190 fn test_operations_from_records() {
1191 let tests = vec![
1193 (
1194 HttpRecordsOutput {
1195 schema: OutputSchema {
1196 column_schemas: vec![
1197 ColumnSchema {
1198 name: "span_name".to_string(),
1199 data_type: "String".to_string(),
1200 },
1201 ColumnSchema {
1202 name: "span_kind".to_string(),
1203 data_type: "String".to_string(),
1204 },
1205 ],
1206 },
1207 rows: vec![
1208 vec![
1209 JsonValue::String("access-mysql".to_string()),
1210 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1211 ],
1212 vec![
1213 JsonValue::String("access-redis".to_string()),
1214 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1215 ],
1216 ],
1217 total_rows: 2,
1218 metrics: HashMap::new(),
1219 },
1220 false,
1221 vec![
1222 Operation {
1223 name: "access-mysql".to_string(),
1224 span_kind: None,
1225 },
1226 Operation {
1227 name: "access-redis".to_string(),
1228 span_kind: None,
1229 },
1230 ],
1231 ),
1232 (
1233 HttpRecordsOutput {
1234 schema: OutputSchema {
1235 column_schemas: vec![
1236 ColumnSchema {
1237 name: "span_name".to_string(),
1238 data_type: "String".to_string(),
1239 },
1240 ColumnSchema {
1241 name: "span_kind".to_string(),
1242 data_type: "String".to_string(),
1243 },
1244 ],
1245 },
1246 rows: vec![
1247 vec![
1248 JsonValue::String("access-mysql".to_string()),
1249 JsonValue::String("SPAN_KIND_SERVER".to_string()),
1250 ],
1251 vec![
1252 JsonValue::String("access-redis".to_string()),
1253 JsonValue::String("SPAN_KIND_CLIENT".to_string()),
1254 ],
1255 ],
1256 total_rows: 2,
1257 metrics: HashMap::new(),
1258 },
1259 true,
1260 vec![
1261 Operation {
1262 name: "access-mysql".to_string(),
1263 span_kind: Some("server".to_string()),
1264 },
1265 Operation {
1266 name: "access-redis".to_string(),
1267 span_kind: Some("client".to_string()),
1268 },
1269 ],
1270 ),
1271 ];
1272
1273 for (records, contain_span_kind, expected) in tests {
1274 let operations = operations_from_records(records, contain_span_kind).unwrap();
1275 assert_eq!(operations, expected);
1276 }
1277 }
1278
1279 #[test]
1280 fn test_traces_from_records() {
1281 let tests = vec![(
1283 HttpRecordsOutput {
1284 schema: OutputSchema {
1285 column_schemas: vec![
1286 ColumnSchema {
1287 name: "trace_id".to_string(),
1288 data_type: "String".to_string(),
1289 },
1290 ColumnSchema {
1291 name: "timestamp".to_string(),
1292 data_type: "TimestampNanosecond".to_string(),
1293 },
1294 ColumnSchema {
1295 name: "duration_nano".to_string(),
1296 data_type: "UInt64".to_string(),
1297 },
1298 ColumnSchema {
1299 name: "service_name".to_string(),
1300 data_type: "String".to_string(),
1301 },
1302 ColumnSchema {
1303 name: "span_name".to_string(),
1304 data_type: "String".to_string(),
1305 },
1306 ColumnSchema {
1307 name: "span_id".to_string(),
1308 data_type: "String".to_string(),
1309 },
1310 ColumnSchema {
1311 name: "span_attributes".to_string(),
1312 data_type: "Json".to_string(),
1313 },
1314 ],
1315 },
1316 rows: vec![
1317 vec![
1318 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1319 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1320 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1321 JsonValue::String("test-service-0".to_string()),
1322 JsonValue::String("access-mysql".to_string()),
1323 JsonValue::String("008421dbbd33a3e9".to_string()),
1324 JsonValue::Object(
1325 json!({
1326 "operation.type": "access-mysql",
1327 })
1328 .as_object()
1329 .unwrap()
1330 .clone(),
1331 ),
1332 ],
1333 vec![
1334 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1335 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1336 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1337 JsonValue::String("test-service-0".to_string()),
1338 JsonValue::String("access-redis".to_string()),
1339 JsonValue::String("ffa03416a7b9ea48".to_string()),
1340 JsonValue::Object(
1341 json!({
1342 "operation.type": "access-redis",
1343 })
1344 .as_object()
1345 .unwrap()
1346 .clone(),
1347 ),
1348 ],
1349 ],
1350 total_rows: 2,
1351 metrics: HashMap::new(),
1352 },
1353 vec![Trace {
1354 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1355 spans: vec![
1356 Span {
1357 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1358 span_id: "008421dbbd33a3e9".to_string(),
1359 operation_name: "access-mysql".to_string(),
1360 start_time: 1738726754492422,
1361 duration: 100000,
1362 tags: vec![KeyValue {
1363 key: "operation.type".to_string(),
1364 value_type: ValueType::String,
1365 value: Value::String("access-mysql".to_string()),
1366 }],
1367 process_id: "p1".to_string(),
1368 ..Default::default()
1369 },
1370 Span {
1371 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1372 span_id: "ffa03416a7b9ea48".to_string(),
1373 operation_name: "access-redis".to_string(),
1374 start_time: 1738726754642422,
1375 duration: 100000,
1376 tags: vec![KeyValue {
1377 key: "operation.type".to_string(),
1378 value_type: ValueType::String,
1379 value: Value::String("access-redis".to_string()),
1380 }],
1381 process_id: "p1".to_string(),
1382 ..Default::default()
1383 },
1384 ],
1385 processes: HashMap::from([(
1386 "p1".to_string(),
1387 Process {
1388 service_name: "test-service-0".to_string(),
1389 tags: vec![],
1390 },
1391 )]),
1392 ..Default::default()
1393 }],
1394 )];
1395
1396 for (records, expected) in tests {
1397 let traces = traces_from_records(records).unwrap();
1398 assert_eq!(traces, expected);
1399 }
1400 }
1401
1402 #[test]
1403 fn test_traces_from_v1_records() {
1404 let tests = vec![(
1406 HttpRecordsOutput {
1407 schema: OutputSchema {
1408 column_schemas: vec![
1409 ColumnSchema {
1410 name: "trace_id".to_string(),
1411 data_type: "String".to_string(),
1412 },
1413 ColumnSchema {
1414 name: "timestamp".to_string(),
1415 data_type: "TimestampNanosecond".to_string(),
1416 },
1417 ColumnSchema {
1418 name: "duration_nano".to_string(),
1419 data_type: "UInt64".to_string(),
1420 },
1421 ColumnSchema {
1422 name: "service_name".to_string(),
1423 data_type: "String".to_string(),
1424 },
1425 ColumnSchema {
1426 name: "span_name".to_string(),
1427 data_type: "String".to_string(),
1428 },
1429 ColumnSchema {
1430 name: "span_id".to_string(),
1431 data_type: "String".to_string(),
1432 },
1433 ColumnSchema {
1434 name: "span_attributes.http.request.method".to_string(),
1435 data_type: "String".to_string(),
1436 },
1437 ColumnSchema {
1438 name: "span_attributes.http.request.url".to_string(),
1439 data_type: "String".to_string(),
1440 },
1441 ColumnSchema {
1442 name: "span_attributes.http.status_code".to_string(),
1443 data_type: "UInt64".to_string(),
1444 },
1445 ],
1446 },
1447 rows: vec![
1448 vec![
1449 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1450 JsonValue::Number(Number::from_u128(1738726754492422000).unwrap()),
1451 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1452 JsonValue::String("test-service-0".to_string()),
1453 JsonValue::String("access-mysql".to_string()),
1454 JsonValue::String("008421dbbd33a3e9".to_string()),
1455 JsonValue::String("GET".to_string()),
1456 JsonValue::String("/data".to_string()),
1457 JsonValue::Number(Number::from_u128(200).unwrap()),
1458 ],
1459 vec![
1460 JsonValue::String("5611dce1bc9ebed65352d99a027b08ea".to_string()),
1461 JsonValue::Number(Number::from_u128(1738726754642422000).unwrap()),
1462 JsonValue::Number(Number::from_u128(100000000).unwrap()),
1463 JsonValue::String("test-service-0".to_string()),
1464 JsonValue::String("access-redis".to_string()),
1465 JsonValue::String("ffa03416a7b9ea48".to_string()),
1466 JsonValue::String("POST".to_string()),
1467 JsonValue::String("/create".to_string()),
1468 JsonValue::Number(Number::from_u128(400).unwrap()),
1469 ],
1470 ],
1471 total_rows: 2,
1472 metrics: HashMap::new(),
1473 },
1474 vec![Trace {
1475 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1476 spans: vec![
1477 Span {
1478 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1479 span_id: "008421dbbd33a3e9".to_string(),
1480 operation_name: "access-mysql".to_string(),
1481 start_time: 1738726754492422,
1482 duration: 100000,
1483 tags: vec![
1484 KeyValue {
1485 key: "http.request.method".to_string(),
1486 value_type: ValueType::String,
1487 value: Value::String("GET".to_string()),
1488 },
1489 KeyValue {
1490 key: "http.request.url".to_string(),
1491 value_type: ValueType::String,
1492 value: Value::String("/data".to_string()),
1493 },
1494 KeyValue {
1495 key: "http.status_code".to_string(),
1496 value_type: ValueType::Int64,
1497 value: Value::Int64(200),
1498 },
1499 ],
1500 process_id: "p1".to_string(),
1501 ..Default::default()
1502 },
1503 Span {
1504 trace_id: "5611dce1bc9ebed65352d99a027b08ea".to_string(),
1505 span_id: "ffa03416a7b9ea48".to_string(),
1506 operation_name: "access-redis".to_string(),
1507 start_time: 1738726754642422,
1508 duration: 100000,
1509 tags: vec![
1510 KeyValue {
1511 key: "http.request.method".to_string(),
1512 value_type: ValueType::String,
1513 value: Value::String("POST".to_string()),
1514 },
1515 KeyValue {
1516 key: "http.request.url".to_string(),
1517 value_type: ValueType::String,
1518 value: Value::String("/create".to_string()),
1519 },
1520 KeyValue {
1521 key: "http.status_code".to_string(),
1522 value_type: ValueType::Int64,
1523 value: Value::Int64(400),
1524 },
1525 ],
1526 process_id: "p1".to_string(),
1527 ..Default::default()
1528 },
1529 ],
1530 processes: HashMap::from([(
1531 "p1".to_string(),
1532 Process {
1533 service_name: "test-service-0".to_string(),
1534 tags: vec![],
1535 },
1536 )]),
1537 ..Default::default()
1538 }],
1539 )];
1540
1541 for (records, expected) in tests {
1542 let traces = traces_from_records(records).unwrap();
1543 assert_eq!(traces, expected);
1544 }
1545 }
1546
1547 #[test]
1548 fn test_from_jaeger_query_params() {
1549 let tests = vec![
1551 (
1552 JaegerQueryParams {
1553 service_name: Some("test-service-0".to_string()),
1554 ..Default::default()
1555 },
1556 QueryTraceParams {
1557 service_name: "test-service-0".to_string(),
1558 ..Default::default()
1559 },
1560 ),
1561 (
1562 JaegerQueryParams {
1563 service_name: Some("test-service-0".to_string()),
1564 operation_name: Some("access-mysql".to_string()),
1565 start: Some(1738726754492422),
1566 end: Some(1738726754642422),
1567 max_duration: Some("100ms".to_string()),
1568 min_duration: Some("50ms".to_string()),
1569 limit: Some(10),
1570 tags: Some("{\"http.status_code\":\"200\",\"latency\":\"11.234\",\"error\":\"false\",\"http.method\":\"GET\",\"http.path\":\"/api/v1/users\"}".to_string()),
1571 ..Default::default()
1572 },
1573 QueryTraceParams {
1574 service_name: "test-service-0".to_string(),
1575 operation_name: Some("access-mysql".to_string()),
1576 start_time: Some(1738726754492422000),
1577 end_time: Some(1738726754642422000),
1578 min_duration: Some(50000000),
1579 max_duration: Some(100000000),
1580 limit: Some(10),
1581 tags: Some(HashMap::from([
1582 ("http.status_code".to_string(), JsonValue::Number(Number::from(200))),
1583 ("latency".to_string(), JsonValue::Number(Number::from_f64(11.234).unwrap())),
1584 ("error".to_string(), JsonValue::Bool(false)),
1585 ("http.method".to_string(), JsonValue::String("GET".to_string())),
1586 ("http.path".to_string(), JsonValue::String("/api/v1/users".to_string())),
1587 ])),
1588 },
1589 ),
1590 ];
1591
1592 for (query_params, expected) in tests {
1593 let query_params = QueryTraceParams::from_jaeger_query_params(query_params).unwrap();
1594 assert_eq!(query_params, expected);
1595 }
1596 }
1597
1598 #[test]
1599 fn test_check_schema() {
1600 let tests = vec![(
1602 HttpRecordsOutput {
1603 schema: OutputSchema {
1604 column_schemas: vec![
1605 ColumnSchema {
1606 name: "trace_id".to_string(),
1607 data_type: "String".to_string(),
1608 },
1609 ColumnSchema {
1610 name: "timestamp".to_string(),
1611 data_type: "TimestampNanosecond".to_string(),
1612 },
1613 ColumnSchema {
1614 name: "duration_nano".to_string(),
1615 data_type: "UInt64".to_string(),
1616 },
1617 ColumnSchema {
1618 name: "service_name".to_string(),
1619 data_type: "String".to_string(),
1620 },
1621 ColumnSchema {
1622 name: "span_name".to_string(),
1623 data_type: "String".to_string(),
1624 },
1625 ColumnSchema {
1626 name: "span_id".to_string(),
1627 data_type: "String".to_string(),
1628 },
1629 ColumnSchema {
1630 name: "span_attributes".to_string(),
1631 data_type: "Json".to_string(),
1632 },
1633 ],
1634 },
1635 rows: vec![],
1636 total_rows: 0,
1637 metrics: HashMap::new(),
1638 },
1639 vec![
1640 (TRACE_ID_COLUMN, "String"),
1641 (TIMESTAMP_COLUMN, "TimestampNanosecond"),
1642 (DURATION_NANO_COLUMN, "UInt64"),
1643 (SERVICE_NAME_COLUMN, "String"),
1644 (SPAN_NAME_COLUMN, "String"),
1645 (SPAN_ID_COLUMN, "String"),
1646 (SPAN_ATTRIBUTES_COLUMN, "Json"),
1647 ],
1648 true,
1649 )];
1650
1651 for (records, expected_schema, is_ok) in tests {
1652 let result = check_schema(&records, &expected_schema);
1653 assert_eq!(result.is_ok(), is_ok);
1654 }
1655 }
1656
1657 #[test]
1658 fn test_normalize_span_kind() {
1659 let tests = vec![
1660 ("SPAN_KIND_SERVER".to_string(), "server".to_string()),
1661 ("SPAN_KIND_CLIENT".to_string(), "client".to_string()),
1662 ];
1663
1664 for (input, expected) in tests {
1665 let result = normalize_span_kind(&input);
1666 assert_eq!(result, expected);
1667 }
1668 }
1669
1670 #[test]
1671 fn test_convert_string_to_number() {
1672 let tests = vec![
1673 (
1674 JsonValue::String("123".to_string()),
1675 Some(JsonValue::Number(Number::from(123))),
1676 ),
1677 (
1678 JsonValue::String("123.456".to_string()),
1679 Some(JsonValue::Number(Number::from_f64(123.456).unwrap())),
1680 ),
1681 ];
1682
1683 for (input, expected) in tests {
1684 let result = convert_string_to_number(&input);
1685 assert_eq!(result, expected);
1686 }
1687 }
1688
1689 #[test]
1690 fn test_convert_string_to_boolean() {
1691 let tests = vec![
1692 (
1693 JsonValue::String("true".to_string()),
1694 Some(JsonValue::Bool(true)),
1695 ),
1696 (
1697 JsonValue::String("false".to_string()),
1698 Some(JsonValue::Bool(false)),
1699 ),
1700 ];
1701
1702 for (input, expected) in tests {
1703 let result = convert_string_to_boolean(&input);
1704 assert_eq!(result, expected);
1705 }
1706 }
1707}