1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21 TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion_expr::select_expr::SelectExpr;
36use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
37use datatypes::value::ValueRef;
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42 TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
45use servers::otlp::trace::{
46 DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47 SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48 TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::instance::Instance;
57
58const DEFAULT_LIMIT: usize = 2000;
59
60#[async_trait]
61impl JaegerQueryHandler for Instance {
62 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
63 Ok(query_trace_table(
65 ctx,
66 self.catalog_manager(),
67 self.query_engine(),
68 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
69 vec![],
70 vec![],
71 None,
72 None,
73 vec![col(SERVICE_NAME_COLUMN)],
74 )
75 .await?)
76 }
77
78 async fn get_operations(
79 &self,
80 ctx: QueryContextRef,
81 service_name: &str,
82 span_kind: Option<&str>,
83 ) -> ServerResult<Output> {
84 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
85
86 if let Some(span_kind) = span_kind {
87 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
88 "{}{}",
89 SPAN_KIND_PREFIX,
90 span_kind.to_uppercase()
91 ))));
92 }
93
94 Ok(query_trace_table(
107 ctx,
108 self.catalog_manager(),
109 self.query_engine(),
110 vec![
111 SelectExpr::from(col(SPAN_NAME_COLUMN)),
112 SelectExpr::from(col(SPAN_KIND_COLUMN)),
113 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
114 SelectExpr::from(col(TIMESTAMP_COLUMN)),
115 ],
116 filters,
117 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
119 None,
120 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
121 )
122 .await?)
123 }
124
125 async fn get_trace(
126 &self,
127 ctx: QueryContextRef,
128 trace_id: &str,
129 start_time: Option<i64>,
130 end_time: Option<i64>,
131 ) -> ServerResult<Output> {
132 let selects = vec![wildcard()];
147
148 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
149
150 if let Some(start_time) = start_time {
151 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
152 }
153
154 if let Some(end_time) = end_time {
155 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
156 }
157
158 Ok(query_trace_table(
159 ctx,
160 self.catalog_manager(),
161 self.query_engine(),
162 selects,
163 filters,
164 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
166 None,
167 vec![],
168 )
169 .await?)
170 }
171
172 async fn find_traces(
173 &self,
174 ctx: QueryContextRef,
175 query_params: QueryTraceParams,
176 ) -> ServerResult<Output> {
177 let mut filters = vec![];
178
179 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
181
182 if let Some(operation_name) = query_params.operation_name {
183 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
184 }
185
186 if let Some(start_time) = query_params.start_time {
187 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
188 }
189
190 if let Some(end_time) = query_params.end_time {
191 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
192 }
193
194 if let Some(min_duration) = query_params.min_duration {
195 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
196 }
197
198 if let Some(max_duration) = query_params.max_duration {
199 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
200 }
201
202 let output = query_trace_table(
219 ctx.clone(),
220 self.catalog_manager(),
221 self.query_engine(),
222 vec![wildcard()],
223 filters,
224 vec![],
225 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
226 query_params.tags,
227 vec![col(TRACE_ID_COLUMN)],
228 )
229 .await?;
230
231 let mut filters = vec![
244 col(TRACE_ID_COLUMN).in_list(
245 trace_ids_from_output(output)
246 .await?
247 .iter()
248 .map(lit)
249 .collect::<Vec<Expr>>(),
250 false,
251 ),
252 ];
253
254 if let Some(start_time) = query_params.start_time {
255 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
256 }
257
258 if let Some(end_time) = query_params.end_time {
259 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
260 }
261
262 Ok(query_trace_table(
263 ctx,
264 self.catalog_manager(),
265 self.query_engine(),
266 vec![wildcard()],
267 filters,
268 vec![col(TIMESTAMP_COLUMN).sort(false, false)], None,
270 None,
271 vec![],
272 )
273 .await?)
274 }
275}
276
277#[allow(clippy::too_many_arguments)]
278async fn query_trace_table(
279 ctx: QueryContextRef,
280 catalog_manager: &CatalogManagerRef,
281 query_engine: &QueryEngineRef,
282 selects: Vec<SelectExpr>,
283 filters: Vec<Expr>,
284 sorts: Vec<SortExpr>,
285 limit: Option<usize>,
286 tags: Option<HashMap<String, JsonValue>>,
287 distincts: Vec<Expr>,
288) -> ServerResult<Output> {
289 let trace_table_name = ctx
290 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
291 .unwrap_or(TRACE_TABLE_NAME);
292
293 let table_name = {
296 if match selects.as_slice() {
297 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
298 _ => false,
299 } {
300 &trace_services_table_name(trace_table_name)
301 } else if !distincts.is_empty()
302 && distincts.contains(&col(SPAN_NAME_COLUMN))
303 && distincts.contains(&col(SPAN_KIND_COLUMN))
304 {
305 &trace_operations_table_name(trace_table_name)
306 } else {
307 trace_table_name
308 }
309 };
310
311 let table = catalog_manager
312 .table(
313 ctx.current_catalog(),
314 &ctx.current_schema(),
315 table_name,
316 Some(&ctx),
317 )
318 .await
319 .context(CatalogSnafu)?
320 .with_context(|| TableNotFoundSnafu {
321 table: table_name,
322 catalog: ctx.current_catalog(),
323 schema: ctx.current_schema(),
324 })?;
325
326 let is_data_model_v1 = table
327 .clone()
328 .table_info()
329 .meta
330 .options
331 .extra_options
332 .get(TABLE_DATA_MODEL)
333 .map(|s| s.as_str())
334 == Some(TABLE_DATA_MODEL_TRACE_V1);
335
336 let col_names = table
338 .table_info()
339 .meta
340 .field_column_names()
341 .map(|s| format!("\"{}\"", s))
342 .collect::<HashSet<String>>();
343
344 let df_context = create_df_context(query_engine)?;
345
346 let dataframe = df_context
347 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
348 .context(DataFusionSnafu)?;
349
350 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
351
352 let dataframe = filters
354 .into_iter()
355 .chain(tags.map_or(Ok(vec![]), |t| {
356 tags_filters(&dataframe, t, is_data_model_v1, &col_names)
357 })?)
358 .try_fold(dataframe, |df, expr| {
359 df.filter(expr).context(DataFusionSnafu)
360 })?;
361
362 let dataframe = if !distincts.is_empty() {
364 dataframe
365 .distinct_on(distincts.clone(), distincts, None)
366 .context(DataFusionSnafu)?
367 } else {
368 dataframe
369 };
370
371 let dataframe = if !sorts.is_empty() {
373 dataframe.sort(sorts).context(DataFusionSnafu)?
374 } else {
375 dataframe
376 };
377
378 let dataframe = if let Some(limit) = limit {
380 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
381 } else {
382 dataframe
383 };
384
385 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
387
388 let output = Output::new_with_stream(Box::pin(
389 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
390 ));
391
392 Ok(output)
393}
394
395fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
400 let df_context = SessionContext::new_with_state(
401 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
402 );
403
404 let udfs: Vec<FunctionRef> = vec![
406 Arc::new(JsonGetInt::default()),
407 Arc::new(JsonGetFloat::default()),
408 Arc::new(JsonGetBool::default()),
409 Arc::new(JsonGetString::default()),
410 ];
411
412 for udf in udfs {
413 df_context.register_udf(create_udf(udf));
414 }
415
416 Ok(df_context)
417}
418
419fn json_tag_filters(
420 dataframe: &DataFrame,
421 tags: HashMap<String, JsonValue>,
422) -> ServerResult<Vec<Expr>> {
423 let mut filters = vec![];
424
425 for (key, value) in tags.iter() {
427 if let JsonValue::String(value) = value {
428 filters.push(
429 dataframe
430 .registry()
431 .udf(JsonGetString::NAME)
432 .context(DataFusionSnafu)?
433 .call(vec![
434 col(SPAN_ATTRIBUTES_COLUMN),
435 lit(format!("[\"{}\"]", key)),
436 ])
437 .eq(lit(value)),
438 );
439 }
440 if let JsonValue::Number(value) = value {
441 if value.is_i64() {
442 filters.push(
443 dataframe
444 .registry()
445 .udf(JsonGetInt::NAME)
446 .context(DataFusionSnafu)?
447 .call(vec![
448 col(SPAN_ATTRIBUTES_COLUMN),
449 lit(format!("[\"{}\"]", key)),
450 ])
451 .eq(lit(value.as_i64().unwrap())),
452 );
453 }
454 if value.is_f64() {
455 filters.push(
456 dataframe
457 .registry()
458 .udf(JsonGetFloat::NAME)
459 .context(DataFusionSnafu)?
460 .call(vec![
461 col(SPAN_ATTRIBUTES_COLUMN),
462 lit(format!("[\"{}\"]", key)),
463 ])
464 .eq(lit(value.as_f64().unwrap())),
465 );
466 }
467 }
468 if let JsonValue::Bool(value) = value {
469 filters.push(
470 dataframe
471 .registry()
472 .udf(JsonGetBool::NAME)
473 .context(DataFusionSnafu)?
474 .call(vec![
475 col(SPAN_ATTRIBUTES_COLUMN),
476 lit(format!("[\"{}\"]", key)),
477 ])
478 .eq(lit(*value)),
479 );
480 }
481 }
482
483 Ok(filters)
484}
485
486#[inline]
489fn check_col_and_build_expr<F>(
490 span_key: String,
491 resource_key: String,
492 key: &str,
493 col_names: &HashSet<String>,
494 expr_builder: F,
495) -> Option<Expr>
496where
497 F: FnOnce(String) -> Expr,
498{
499 if col_names.contains(&span_key) {
500 return Some(expr_builder(span_key));
501 }
502 if col_names.contains(&resource_key) {
503 return Some(expr_builder(resource_key));
504 }
505 warn!("tag key {} not found in table columns", key);
506 None
507}
508
509fn flatten_tag_filters(
510 tags: HashMap<String, JsonValue>,
511 col_names: &HashSet<String>,
512) -> ServerResult<Vec<Expr>> {
513 let filters = tags
514 .into_iter()
515 .filter_map(|(key, value)| {
516 if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
517 return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
518 }
519
520 let span_key = format!("\"span_attributes.{}\"", key);
522 let resource_key = format!("\"resource_attributes.{}\"", key);
523 match value {
524 JsonValue::String(value) => {
525 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
526 col(k).eq(lit(value))
527 })
528 }
529 JsonValue::Number(value) => {
530 if value.is_f64() {
531 let value = value.as_f64().unwrap();
533 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
534 col(k).eq(lit(value))
535 })
536 } else {
537 let value = value.as_i64().unwrap();
538 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
539 col(k).eq(lit(value))
540 })
541 }
542 }
543 JsonValue::Bool(value) => {
544 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
545 col(k).eq(lit(value))
546 })
547 }
548 JsonValue::Null => {
549 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
550 col(k).is_null()
551 })
552 }
553 JsonValue::Array(_value) => None,
555 JsonValue::Object(_value) => None,
556 }
557 })
558 .collect();
559 Ok(filters)
560}
561
562fn tags_filters(
563 dataframe: &DataFrame,
564 tags: HashMap<String, JsonValue>,
565 is_data_model_v1: bool,
566 col_names: &HashSet<String>,
567) -> ServerResult<Vec<Expr>> {
568 if is_data_model_v1 {
569 flatten_tag_filters(tags, col_names)
570 } else {
571 json_tag_filters(dataframe, tags)
572 }
573}
574
575async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
577 if let OutputData::Stream(stream) = output.data {
578 let schema = stream.schema().clone();
579 let recordbatches = util::collect(stream)
580 .await
581 .context(CollectRecordbatchSnafu)?;
582
583 if !recordbatches.is_empty()
585 && schema.num_columns() == 1
586 && schema.contains_column(TRACE_ID_COLUMN)
587 {
588 let mut trace_ids = vec![];
589 for recordbatch in recordbatches {
590 for col in recordbatch.columns().iter() {
591 for row_idx in 0..recordbatch.num_rows() {
592 if let ValueRef::String(value) = col.get_ref(row_idx) {
593 trace_ids.push(value.to_string());
594 }
595 }
596 }
597 }
598
599 return Ok(trace_ids);
600 }
601 }
602
603 Ok(vec![])
604}