1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21 TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion_expr::select_expr::SelectExpr;
36use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
37use datatypes::value::ValueRef;
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42 TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
45use servers::otlp::trace::{
46 DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47 SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48 TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::instance::Instance;
57
58const DEFAULT_LIMIT: usize = 2000;
59
60#[async_trait]
61impl JaegerQueryHandler for Instance {
62 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
63 Ok(query_trace_table(
65 ctx,
66 self.catalog_manager(),
67 self.query_engine(),
68 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
69 vec![],
70 vec![],
71 None,
72 None,
73 vec![col(SERVICE_NAME_COLUMN)],
74 )
75 .await?)
76 }
77
78 async fn get_operations(
79 &self,
80 ctx: QueryContextRef,
81 service_name: &str,
82 span_kind: Option<&str>,
83 ) -> ServerResult<Output> {
84 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
85
86 if let Some(span_kind) = span_kind {
87 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
88 "{}{}",
89 SPAN_KIND_PREFIX,
90 span_kind.to_uppercase()
91 ))));
92 }
93
94 Ok(query_trace_table(
107 ctx,
108 self.catalog_manager(),
109 self.query_engine(),
110 vec![
111 SelectExpr::from(col(SPAN_NAME_COLUMN)),
112 SelectExpr::from(col(SPAN_KIND_COLUMN)),
113 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
114 SelectExpr::from(col(TIMESTAMP_COLUMN)),
115 ],
116 filters,
117 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
119 None,
120 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
121 )
122 .await?)
123 }
124
125 async fn get_trace(
126 &self,
127 ctx: QueryContextRef,
128 trace_id: &str,
129 start_time: Option<i64>,
130 end_time: Option<i64>,
131 limit: Option<usize>,
132 ) -> ServerResult<Output> {
133 let selects = vec![wildcard()];
148
149 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
150
151 if let Some(start_time) = start_time {
152 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
153 }
154
155 if let Some(end_time) = end_time {
156 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
157 }
158
159 let limit = if start_time.is_some() && end_time.is_some() {
160 limit
162 } else {
163 limit.or(Some(DEFAULT_LIMIT))
164 };
165
166 Ok(query_trace_table(
167 ctx,
168 self.catalog_manager(),
169 self.query_engine(),
170 selects,
171 filters,
172 vec![col(TIMESTAMP_COLUMN).sort(false, false)], limit,
174 None,
175 vec![],
176 )
177 .await?)
178 }
179
180 async fn find_traces(
181 &self,
182 ctx: QueryContextRef,
183 query_params: QueryTraceParams,
184 ) -> ServerResult<Output> {
185 let mut filters = vec![];
186
187 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
189
190 if let Some(operation_name) = query_params.operation_name {
191 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
192 }
193
194 if let Some(start_time) = query_params.start_time {
195 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
196 }
197
198 if let Some(end_time) = query_params.end_time {
199 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
200 }
201
202 if let Some(min_duration) = query_params.min_duration {
203 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
204 }
205
206 if let Some(max_duration) = query_params.max_duration {
207 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
208 }
209
210 let output = query_trace_table(
227 ctx.clone(),
228 self.catalog_manager(),
229 self.query_engine(),
230 vec![wildcard()],
231 filters,
232 vec![],
233 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
234 query_params.tags,
235 vec![col(TRACE_ID_COLUMN)],
236 )
237 .await?;
238
239 let mut filters = vec![
252 col(TRACE_ID_COLUMN).in_list(
253 trace_ids_from_output(output)
254 .await?
255 .iter()
256 .map(lit)
257 .collect::<Vec<Expr>>(),
258 false,
259 ),
260 ];
261
262 if let Some(start_time) = query_params.start_time {
263 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
264 }
265
266 if let Some(end_time) = query_params.end_time {
267 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
268 }
269
270 Ok(query_trace_table(
271 ctx,
272 self.catalog_manager(),
273 self.query_engine(),
274 vec![wildcard()],
275 filters,
276 vec![col(TIMESTAMP_COLUMN).sort(false, false)], None,
278 None,
279 vec![],
280 )
281 .await?)
282 }
283}
284
285#[allow(clippy::too_many_arguments)]
286async fn query_trace_table(
287 ctx: QueryContextRef,
288 catalog_manager: &CatalogManagerRef,
289 query_engine: &QueryEngineRef,
290 selects: Vec<SelectExpr>,
291 filters: Vec<Expr>,
292 sorts: Vec<SortExpr>,
293 limit: Option<usize>,
294 tags: Option<HashMap<String, JsonValue>>,
295 distincts: Vec<Expr>,
296) -> ServerResult<Output> {
297 let trace_table_name = ctx
298 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
299 .unwrap_or(TRACE_TABLE_NAME);
300
301 let table_name = {
304 if match selects.as_slice() {
305 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
306 _ => false,
307 } {
308 &trace_services_table_name(trace_table_name)
309 } else if !distincts.is_empty()
310 && distincts.contains(&col(SPAN_NAME_COLUMN))
311 && distincts.contains(&col(SPAN_KIND_COLUMN))
312 {
313 &trace_operations_table_name(trace_table_name)
314 } else {
315 trace_table_name
316 }
317 };
318
319 let table = catalog_manager
320 .table(
321 ctx.current_catalog(),
322 &ctx.current_schema(),
323 table_name,
324 Some(&ctx),
325 )
326 .await
327 .context(CatalogSnafu)?
328 .with_context(|| TableNotFoundSnafu {
329 table: table_name,
330 catalog: ctx.current_catalog(),
331 schema: ctx.current_schema(),
332 })?;
333
334 let is_data_model_v1 = table
335 .clone()
336 .table_info()
337 .meta
338 .options
339 .extra_options
340 .get(TABLE_DATA_MODEL)
341 .map(|s| s.as_str())
342 == Some(TABLE_DATA_MODEL_TRACE_V1);
343
344 let col_names = table
346 .table_info()
347 .meta
348 .field_column_names()
349 .map(|s| format!("\"{}\"", s))
350 .collect::<HashSet<String>>();
351
352 let df_context = create_df_context(query_engine)?;
353
354 let dataframe = df_context
355 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
356 .context(DataFusionSnafu)?;
357
358 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
359
360 let dataframe = filters
362 .into_iter()
363 .chain(tags.map_or(Ok(vec![]), |t| {
364 tags_filters(&dataframe, t, is_data_model_v1, &col_names)
365 })?)
366 .try_fold(dataframe, |df, expr| {
367 df.filter(expr).context(DataFusionSnafu)
368 })?;
369
370 let dataframe = if !distincts.is_empty() {
372 dataframe
373 .distinct_on(distincts.clone(), distincts, None)
374 .context(DataFusionSnafu)?
375 } else {
376 dataframe
377 };
378
379 let dataframe = if !sorts.is_empty() {
381 dataframe.sort(sorts).context(DataFusionSnafu)?
382 } else {
383 dataframe
384 };
385
386 let dataframe = if let Some(limit) = limit {
388 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
389 } else {
390 dataframe
391 };
392
393 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
395
396 let output = Output::new_with_stream(Box::pin(
397 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
398 ));
399
400 Ok(output)
401}
402
403fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
408 let df_context = SessionContext::new_with_state(
409 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
410 );
411
412 let udfs: Vec<FunctionRef> = vec![
414 Arc::new(JsonGetInt::default()),
415 Arc::new(JsonGetFloat::default()),
416 Arc::new(JsonGetBool::default()),
417 Arc::new(JsonGetString::default()),
418 ];
419
420 for udf in udfs {
421 df_context.register_udf(create_udf(udf));
422 }
423
424 Ok(df_context)
425}
426
427fn json_tag_filters(
428 dataframe: &DataFrame,
429 tags: HashMap<String, JsonValue>,
430) -> ServerResult<Vec<Expr>> {
431 let mut filters = vec![];
432
433 for (key, value) in tags.iter() {
435 if let JsonValue::String(value) = value {
436 filters.push(
437 dataframe
438 .registry()
439 .udf(JsonGetString::NAME)
440 .context(DataFusionSnafu)?
441 .call(vec![
442 col(SPAN_ATTRIBUTES_COLUMN),
443 lit(format!("[\"{}\"]", key)),
444 ])
445 .eq(lit(value)),
446 );
447 }
448 if let JsonValue::Number(value) = value {
449 if value.is_i64() {
450 filters.push(
451 dataframe
452 .registry()
453 .udf(JsonGetInt::NAME)
454 .context(DataFusionSnafu)?
455 .call(vec![
456 col(SPAN_ATTRIBUTES_COLUMN),
457 lit(format!("[\"{}\"]", key)),
458 ])
459 .eq(lit(value.as_i64().unwrap())),
460 );
461 }
462 if value.is_f64() {
463 filters.push(
464 dataframe
465 .registry()
466 .udf(JsonGetFloat::NAME)
467 .context(DataFusionSnafu)?
468 .call(vec![
469 col(SPAN_ATTRIBUTES_COLUMN),
470 lit(format!("[\"{}\"]", key)),
471 ])
472 .eq(lit(value.as_f64().unwrap())),
473 );
474 }
475 }
476 if let JsonValue::Bool(value) = value {
477 filters.push(
478 dataframe
479 .registry()
480 .udf(JsonGetBool::NAME)
481 .context(DataFusionSnafu)?
482 .call(vec![
483 col(SPAN_ATTRIBUTES_COLUMN),
484 lit(format!("[\"{}\"]", key)),
485 ])
486 .eq(lit(*value)),
487 );
488 }
489 }
490
491 Ok(filters)
492}
493
494#[inline]
497fn check_col_and_build_expr<F>(
498 span_key: String,
499 resource_key: String,
500 key: &str,
501 col_names: &HashSet<String>,
502 expr_builder: F,
503) -> Option<Expr>
504where
505 F: FnOnce(String) -> Expr,
506{
507 if col_names.contains(&span_key) {
508 return Some(expr_builder(span_key));
509 }
510 if col_names.contains(&resource_key) {
511 return Some(expr_builder(resource_key));
512 }
513 warn!("tag key {} not found in table columns", key);
514 None
515}
516
517fn flatten_tag_filters(
518 tags: HashMap<String, JsonValue>,
519 col_names: &HashSet<String>,
520) -> ServerResult<Vec<Expr>> {
521 let filters = tags
522 .into_iter()
523 .filter_map(|(key, value)| {
524 if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
525 return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
526 }
527
528 let span_key = format!("\"span_attributes.{}\"", key);
530 let resource_key = format!("\"resource_attributes.{}\"", key);
531 match value {
532 JsonValue::String(value) => {
533 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
534 col(k).eq(lit(value))
535 })
536 }
537 JsonValue::Number(value) => {
538 if value.is_f64() {
539 let value = value.as_f64().unwrap();
541 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
542 col(k).eq(lit(value))
543 })
544 } else {
545 let value = value.as_i64().unwrap();
546 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
547 col(k).eq(lit(value))
548 })
549 }
550 }
551 JsonValue::Bool(value) => {
552 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
553 col(k).eq(lit(value))
554 })
555 }
556 JsonValue::Null => {
557 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
558 col(k).is_null()
559 })
560 }
561 JsonValue::Array(_value) => None,
563 JsonValue::Object(_value) => None,
564 }
565 })
566 .collect();
567 Ok(filters)
568}
569
570fn tags_filters(
571 dataframe: &DataFrame,
572 tags: HashMap<String, JsonValue>,
573 is_data_model_v1: bool,
574 col_names: &HashSet<String>,
575) -> ServerResult<Vec<Expr>> {
576 if is_data_model_v1 {
577 flatten_tag_filters(tags, col_names)
578 } else {
579 json_tag_filters(dataframe, tags)
580 }
581}
582
583async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
585 if let OutputData::Stream(stream) = output.data {
586 let schema = stream.schema().clone();
587 let recordbatches = util::collect(stream)
588 .await
589 .context(CollectRecordbatchSnafu)?;
590
591 if !recordbatches.is_empty()
593 && schema.num_columns() == 1
594 && schema.contains_column(TRACE_ID_COLUMN)
595 {
596 let mut trace_ids = vec![];
597 for recordbatch in recordbatches {
598 for col in recordbatch.columns().iter() {
599 for row_idx in 0..recordbatch.num_rows() {
600 if let ValueRef::String(value) = col.get_ref(row_idx) {
601 trace_ids.push(value.to_string());
602 }
603 }
604 }
605 }
606
607 return Ok(trace_ids);
608 }
609 }
610
611 Ok(vec![])
612}