1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21 TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion::functions_window::expr_fn::row_number;
36use datafusion_expr::select_expr::SelectExpr;
37use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_nano, wildcard};
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42 TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
45use servers::otlp::trace::{
46 DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47 SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48 TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::TableRef;
54use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
55use table::table::adapter::DfTableProviderAdapter;
56
57use crate::instance::Instance;
58
59const DEFAULT_LIMIT: usize = 2000;
60const KEY_RN: &str = "greptime_rn";
61
62#[async_trait]
63impl JaegerQueryHandler for Instance {
64 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
65 Ok(query_trace_table(
67 ctx,
68 self,
69 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
70 vec![],
71 vec![],
72 None,
73 None,
74 vec![col(SERVICE_NAME_COLUMN)],
75 )
76 .await?)
77 }
78
79 async fn get_operations(
80 &self,
81 ctx: QueryContextRef,
82 service_name: &str,
83 span_kind: Option<&str>,
84 ) -> ServerResult<Output> {
85 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
86
87 if let Some(span_kind) = span_kind {
88 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
89 "{}{}",
90 SPAN_KIND_PREFIX,
91 span_kind.to_uppercase()
92 ))));
93 }
94
95 Ok(query_trace_table(
108 ctx,
109 self,
110 vec![
111 SelectExpr::from(col(SPAN_NAME_COLUMN)),
112 SelectExpr::from(col(SPAN_KIND_COLUMN)),
113 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
114 SelectExpr::from(col(TIMESTAMP_COLUMN)),
115 ],
116 filters,
117 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
119 None,
120 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
121 )
122 .await?)
123 }
124
125 async fn get_trace(
126 &self,
127 ctx: QueryContextRef,
128 trace_id: &str,
129 start_time: Option<i64>,
130 end_time: Option<i64>,
131 limit: Option<usize>,
132 ) -> ServerResult<Output> {
133 let selects = vec![wildcard()];
148
149 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
150
151 if let Some(start_time) = start_time {
152 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
153 }
154
155 if let Some(end_time) = end_time {
156 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
157 }
158
159 Ok(query_trace_table(
160 ctx,
161 self,
162 selects,
163 filters,
164 vec![col(TIMESTAMP_COLUMN).sort(false, false)], limit,
166 None,
167 vec![],
168 )
169 .await?)
170 }
171
172 async fn find_traces(
173 &self,
174 ctx: QueryContextRef,
175 query_params: QueryTraceParams,
176 ) -> ServerResult<Output> {
177 let mut filters = vec![];
178
179 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
181
182 if let Some(operation_name) = query_params.operation_name {
183 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
184 }
185
186 if let Some(start_time) = query_params.start_time {
187 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
188 }
189
190 if let Some(end_time) = query_params.end_time {
191 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
192 }
193
194 if let Some(min_duration) = query_params.min_duration {
195 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
196 }
197
198 if let Some(max_duration) = query_params.max_duration {
199 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
200 }
201
202 let output = query_trace_table(
219 ctx.clone(),
220 self,
221 vec![wildcard()],
222 filters,
223 vec![],
224 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
225 query_params.tags,
226 vec![col(TRACE_ID_COLUMN)],
227 )
228 .await?;
229
230 let mut filters = vec![
243 col(TRACE_ID_COLUMN).in_list(
244 trace_ids_from_output(output)
245 .await?
246 .iter()
247 .map(lit)
248 .collect::<Vec<Expr>>(),
249 false,
250 ),
251 ];
252
253 if let Some(start_time) = query_params.start_time {
254 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
255 }
256
257 if let Some(end_time) = query_params.end_time {
258 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
259 }
260
261 match query_params.user_agent {
262 TraceUserAgent::Grafana => {
263 let table_name = ctx
267 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
268 .unwrap_or(TRACE_TABLE_NAME);
269
270 let table = get_table(ctx.clone(), self.catalog_manager(), table_name).await?;
271
272 Ok(find_traces_rank_3(
273 table,
274 self.query_engine(),
275 filters,
276 vec![col(TIMESTAMP_COLUMN).sort(false, false)], )
278 .await?)
279 }
280 _ => {
281 Ok(query_trace_table(
283 ctx,
284 self,
285 vec![wildcard()],
286 filters,
287 vec![col(TIMESTAMP_COLUMN).sort(false, false)], None,
289 None,
290 vec![],
291 )
292 .await?)
293 }
294 }
295 }
296}
297
298#[allow(clippy::too_many_arguments)]
299async fn query_trace_table(
300 ctx: QueryContextRef,
301 instance: &Instance,
302 selects: Vec<SelectExpr>,
303 filters: Vec<Expr>,
304 sorts: Vec<SortExpr>,
305 limit: Option<usize>,
306 tags: Option<HashMap<String, JsonValue>>,
307 distincts: Vec<Expr>,
308) -> ServerResult<Output> {
309 let trace_table_name = ctx
310 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
311 .unwrap_or(TRACE_TABLE_NAME);
312
313 let table_name = {
316 if match selects.as_slice() {
317 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
318 _ => false,
319 } {
320 &trace_services_table_name(trace_table_name)
321 } else if !distincts.is_empty()
322 && distincts.contains(&col(SPAN_NAME_COLUMN))
323 && distincts.contains(&col(SPAN_KIND_COLUMN))
324 {
325 &trace_operations_table_name(trace_table_name)
326 } else {
327 trace_table_name
328 }
329 };
330
331 let table = instance
332 .catalog_manager()
333 .table(
334 ctx.current_catalog(),
335 &ctx.current_schema(),
336 table_name,
337 Some(&ctx),
338 )
339 .await
340 .context(CatalogSnafu)?
341 .with_context(|| TableNotFoundSnafu {
342 table: table_name,
343 catalog: ctx.current_catalog(),
344 schema: ctx.current_schema(),
345 })?;
346
347 let is_data_model_v1 = table
348 .clone()
349 .table_info()
350 .meta
351 .options
352 .extra_options
353 .get(TABLE_DATA_MODEL)
354 .map(|s| s.as_str())
355 == Some(TABLE_DATA_MODEL_TRACE_V1);
356
357 let col_names = table
359 .table_info()
360 .meta
361 .field_column_names()
362 .map(|s| format!("\"{}\"", s))
363 .collect::<HashSet<String>>();
364
365 let df_context = create_df_context(instance.query_engine())?;
366
367 let dataframe = df_context
368 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
369 .context(DataFusionSnafu)?;
370
371 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
372
373 let dataframe = filters
375 .into_iter()
376 .chain(tags.map_or(Ok(vec![]), |t| {
377 tags_filters(&dataframe, t, is_data_model_v1, &col_names)
378 })?)
379 .try_fold(dataframe, |df, expr| {
380 df.filter(expr).context(DataFusionSnafu)
381 })?;
382
383 let dataframe = if !distincts.is_empty() {
385 dataframe
386 .distinct_on(distincts.clone(), distincts, None)
387 .context(DataFusionSnafu)?
388 } else {
389 dataframe
390 };
391
392 let dataframe = if !sorts.is_empty() {
394 dataframe.sort(sorts).context(DataFusionSnafu)?
395 } else {
396 dataframe
397 };
398
399 let dataframe = if let Some(limit) = limit {
401 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
402 } else {
403 dataframe
404 };
405
406 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
408
409 let output = Output::new_with_stream(Box::pin(
410 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
411 ));
412
413 Ok(output)
414}
415
416async fn get_table(
417 ctx: QueryContextRef,
418 catalog_manager: &CatalogManagerRef,
419 table_name: &str,
420) -> ServerResult<TableRef> {
421 catalog_manager
422 .table(
423 ctx.current_catalog(),
424 &ctx.current_schema(),
425 table_name,
426 Some(&ctx),
427 )
428 .await
429 .context(CatalogSnafu)?
430 .with_context(|| TableNotFoundSnafu {
431 table: table_name,
432 catalog: ctx.current_catalog(),
433 schema: ctx.current_schema(),
434 })
435}
436
437async fn find_traces_rank_3(
438 table: TableRef,
439 query_engine: &QueryEngineRef,
440 filters: Vec<Expr>,
441 sorts: Vec<SortExpr>,
442) -> ServerResult<Output> {
443 let df_context = create_df_context(query_engine)?;
444
445 let dataframe = df_context
446 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
447 .context(DataFusionSnafu)?;
448
449 let dataframe = dataframe
450 .select(vec![wildcard()])
451 .context(DataFusionSnafu)?;
452
453 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
455 df.filter(expr).context(DataFusionSnafu)
456 })?;
457
458 let dataframe = if !sorts.is_empty() {
460 dataframe.sort(sorts).context(DataFusionSnafu)?
461 } else {
462 dataframe
463 };
464
465 let trace_id_col = vec![col(TRACE_ID_COLUMN)];
467 let timestamp_asc = vec![col(TIMESTAMP_COLUMN).sort(true, false)];
468
469 let dataframe = dataframe
470 .with_column(
471 KEY_RN,
472 row_number()
473 .partition_by(trace_id_col)
474 .order_by(timestamp_asc)
475 .build()
476 .context(DataFusionSnafu)?,
477 )
478 .context(DataFusionSnafu)?;
479
480 let dataframe = dataframe
481 .filter(col(KEY_RN).lt_eq(lit(3)))
482 .context(DataFusionSnafu)?;
483
484 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
486
487 let output = Output::new_with_stream(Box::pin(
488 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
489 ));
490
491 Ok(output)
492}
493
494fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
499 let df_context = SessionContext::new_with_state(
500 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
501 );
502
503 let udfs: Vec<FunctionRef> = vec![
505 Arc::new(JsonGetInt::default()),
506 Arc::new(JsonGetFloat::default()),
507 Arc::new(JsonGetBool::default()),
508 Arc::new(JsonGetString::default()),
509 ];
510
511 for udf in udfs {
512 df_context.register_udf(create_udf(udf));
513 }
514
515 Ok(df_context)
516}
517
518fn json_tag_filters(
519 dataframe: &DataFrame,
520 tags: HashMap<String, JsonValue>,
521) -> ServerResult<Vec<Expr>> {
522 let mut filters = vec![];
523
524 for (key, value) in tags.iter() {
526 if let JsonValue::String(value) = value {
527 filters.push(
528 dataframe
529 .registry()
530 .udf(JsonGetString::NAME)
531 .context(DataFusionSnafu)?
532 .call(vec![
533 col(SPAN_ATTRIBUTES_COLUMN),
534 lit(format!("[\"{}\"]", key)),
535 ])
536 .eq(lit(value)),
537 );
538 }
539 if let JsonValue::Number(value) = value {
540 if value.is_i64() {
541 filters.push(
542 dataframe
543 .registry()
544 .udf(JsonGetInt::NAME)
545 .context(DataFusionSnafu)?
546 .call(vec![
547 col(SPAN_ATTRIBUTES_COLUMN),
548 lit(format!("[\"{}\"]", key)),
549 ])
550 .eq(lit(value.as_i64().unwrap())),
551 );
552 }
553 if value.is_f64() {
554 filters.push(
555 dataframe
556 .registry()
557 .udf(JsonGetFloat::NAME)
558 .context(DataFusionSnafu)?
559 .call(vec![
560 col(SPAN_ATTRIBUTES_COLUMN),
561 lit(format!("[\"{}\"]", key)),
562 ])
563 .eq(lit(value.as_f64().unwrap())),
564 );
565 }
566 }
567 if let JsonValue::Bool(value) = value {
568 filters.push(
569 dataframe
570 .registry()
571 .udf(JsonGetBool::NAME)
572 .context(DataFusionSnafu)?
573 .call(vec![
574 col(SPAN_ATTRIBUTES_COLUMN),
575 lit(format!("[\"{}\"]", key)),
576 ])
577 .eq(lit(*value)),
578 );
579 }
580 }
581
582 Ok(filters)
583}
584
585#[inline]
588fn check_col_and_build_expr<F>(
589 span_key: String,
590 resource_key: String,
591 key: &str,
592 col_names: &HashSet<String>,
593 expr_builder: F,
594) -> Option<Expr>
595where
596 F: FnOnce(String) -> Expr,
597{
598 if col_names.contains(&span_key) {
599 return Some(expr_builder(span_key));
600 }
601 if col_names.contains(&resource_key) {
602 return Some(expr_builder(resource_key));
603 }
604 warn!("tag key {} not found in table columns", key);
605 None
606}
607
608fn flatten_tag_filters(
609 tags: HashMap<String, JsonValue>,
610 col_names: &HashSet<String>,
611) -> ServerResult<Vec<Expr>> {
612 let filters = tags
613 .into_iter()
614 .filter_map(|(key, value)| {
615 if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
616 return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
617 }
618
619 let span_key = format!("\"span_attributes.{}\"", key);
621 let resource_key = format!("\"resource_attributes.{}\"", key);
622 match value {
623 JsonValue::String(value) => {
624 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
625 col(k).eq(lit(value))
626 })
627 }
628 JsonValue::Number(value) => {
629 if value.is_f64() {
630 let value = value.as_f64().unwrap();
632 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
633 col(k).eq(lit(value))
634 })
635 } else {
636 let value = value.as_i64().unwrap();
637 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
638 col(k).eq(lit(value))
639 })
640 }
641 }
642 JsonValue::Bool(value) => {
643 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
644 col(k).eq(lit(value))
645 })
646 }
647 JsonValue::Null => {
648 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
649 col(k).is_null()
650 })
651 }
652 JsonValue::Array(_value) => None,
654 JsonValue::Object(_value) => None,
655 }
656 })
657 .collect();
658 Ok(filters)
659}
660
661fn tags_filters(
662 dataframe: &DataFrame,
663 tags: HashMap<String, JsonValue>,
664 is_data_model_v1: bool,
665 col_names: &HashSet<String>,
666) -> ServerResult<Vec<Expr>> {
667 if is_data_model_v1 {
668 flatten_tag_filters(tags, col_names)
669 } else {
670 json_tag_filters(dataframe, tags)
671 }
672}
673
674async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
676 if let OutputData::Stream(stream) = output.data {
677 let schema = stream.schema().clone();
678 let recordbatches = util::collect(stream)
679 .await
680 .context(CollectRecordbatchSnafu)?;
681
682 if !recordbatches.is_empty()
684 && schema.num_columns() == 1
685 && schema.contains_column(TRACE_ID_COLUMN)
686 {
687 let mut trace_ids = vec![];
688 for recordbatch in recordbatches {
689 recordbatch
690 .iter_column_as_string(0)
691 .flatten()
692 .for_each(|x| trace_ids.push(x));
693 }
694
695 return Ok(trace_ids);
696 }
697 }
698
699 Ok(vec![])
700}