1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21 TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion::functions_window::expr_fn::row_number;
36use datafusion_expr::select_expr::SelectExpr;
37use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_nano, wildcard};
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42 TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
45use servers::otlp::trace::{
46 DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47 SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48 TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::TableRef;
54use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
55use table::table::adapter::DfTableProviderAdapter;
56
57use crate::instance::Instance;
58
59const DEFAULT_LIMIT: usize = 2000;
60const KEY_RN: &str = "greptime_rn";
61
62#[async_trait]
63impl JaegerQueryHandler for Instance {
64 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
65 Ok(query_trace_table(
67 ctx,
68 self.catalog_manager(),
69 self.query_engine(),
70 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
71 vec![],
72 vec![],
73 None,
74 None,
75 vec![col(SERVICE_NAME_COLUMN)],
76 )
77 .await?)
78 }
79
80 async fn get_operations(
81 &self,
82 ctx: QueryContextRef,
83 service_name: &str,
84 span_kind: Option<&str>,
85 ) -> ServerResult<Output> {
86 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
87
88 if let Some(span_kind) = span_kind {
89 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
90 "{}{}",
91 SPAN_KIND_PREFIX,
92 span_kind.to_uppercase()
93 ))));
94 }
95
96 Ok(query_trace_table(
109 ctx,
110 self.catalog_manager(),
111 self.query_engine(),
112 vec![
113 SelectExpr::from(col(SPAN_NAME_COLUMN)),
114 SelectExpr::from(col(SPAN_KIND_COLUMN)),
115 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
116 SelectExpr::from(col(TIMESTAMP_COLUMN)),
117 ],
118 filters,
119 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
121 None,
122 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
123 )
124 .await?)
125 }
126
127 async fn get_trace(
128 &self,
129 ctx: QueryContextRef,
130 trace_id: &str,
131 start_time: Option<i64>,
132 end_time: Option<i64>,
133 limit: Option<usize>,
134 ) -> ServerResult<Output> {
135 let selects = vec![wildcard()];
150
151 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
152
153 if let Some(start_time) = start_time {
154 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
155 }
156
157 if let Some(end_time) = end_time {
158 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
159 }
160
161 Ok(query_trace_table(
162 ctx,
163 self.catalog_manager(),
164 self.query_engine(),
165 selects,
166 filters,
167 vec![col(TIMESTAMP_COLUMN).sort(false, false)], limit,
169 None,
170 vec![],
171 )
172 .await?)
173 }
174
175 async fn find_traces(
176 &self,
177 ctx: QueryContextRef,
178 query_params: QueryTraceParams,
179 ) -> ServerResult<Output> {
180 let mut filters = vec![];
181
182 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
184
185 if let Some(operation_name) = query_params.operation_name {
186 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
187 }
188
189 if let Some(start_time) = query_params.start_time {
190 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
191 }
192
193 if let Some(end_time) = query_params.end_time {
194 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
195 }
196
197 if let Some(min_duration) = query_params.min_duration {
198 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
199 }
200
201 if let Some(max_duration) = query_params.max_duration {
202 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
203 }
204
205 let output = query_trace_table(
222 ctx.clone(),
223 self.catalog_manager(),
224 self.query_engine(),
225 vec![wildcard()],
226 filters,
227 vec![],
228 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
229 query_params.tags,
230 vec![col(TRACE_ID_COLUMN)],
231 )
232 .await?;
233
234 let mut filters = vec![
247 col(TRACE_ID_COLUMN).in_list(
248 trace_ids_from_output(output)
249 .await?
250 .iter()
251 .map(lit)
252 .collect::<Vec<Expr>>(),
253 false,
254 ),
255 ];
256
257 if let Some(start_time) = query_params.start_time {
258 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
259 }
260
261 if let Some(end_time) = query_params.end_time {
262 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
263 }
264
265 match query_params.user_agent {
266 TraceUserAgent::Grafana => {
267 let table_name = ctx
271 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
272 .unwrap_or(TRACE_TABLE_NAME);
273
274 let table = get_table(ctx.clone(), self.catalog_manager(), table_name).await?;
275
276 Ok(find_traces_rank_3(
277 table,
278 self.query_engine(),
279 filters,
280 vec![col(TIMESTAMP_COLUMN).sort(false, false)], )
282 .await?)
283 }
284 _ => {
285 Ok(query_trace_table(
287 ctx,
288 self.catalog_manager(),
289 self.query_engine(),
290 vec![wildcard()],
291 filters,
292 vec![col(TIMESTAMP_COLUMN).sort(false, false)], None,
294 None,
295 vec![],
296 )
297 .await?)
298 }
299 }
300 }
301}
302
303#[allow(clippy::too_many_arguments)]
304async fn query_trace_table(
305 ctx: QueryContextRef,
306 catalog_manager: &CatalogManagerRef,
307 query_engine: &QueryEngineRef,
308 selects: Vec<SelectExpr>,
309 filters: Vec<Expr>,
310 sorts: Vec<SortExpr>,
311 limit: Option<usize>,
312 tags: Option<HashMap<String, JsonValue>>,
313 distincts: Vec<Expr>,
314) -> ServerResult<Output> {
315 let trace_table_name = ctx
316 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
317 .unwrap_or(TRACE_TABLE_NAME);
318
319 let table_name = {
322 if match selects.as_slice() {
323 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
324 _ => false,
325 } {
326 &trace_services_table_name(trace_table_name)
327 } else if !distincts.is_empty()
328 && distincts.contains(&col(SPAN_NAME_COLUMN))
329 && distincts.contains(&col(SPAN_KIND_COLUMN))
330 {
331 &trace_operations_table_name(trace_table_name)
332 } else {
333 trace_table_name
334 }
335 };
336
337 let table = catalog_manager
338 .table(
339 ctx.current_catalog(),
340 &ctx.current_schema(),
341 table_name,
342 Some(&ctx),
343 )
344 .await
345 .context(CatalogSnafu)?
346 .with_context(|| TableNotFoundSnafu {
347 table: table_name,
348 catalog: ctx.current_catalog(),
349 schema: ctx.current_schema(),
350 })?;
351
352 let is_data_model_v1 = table
353 .clone()
354 .table_info()
355 .meta
356 .options
357 .extra_options
358 .get(TABLE_DATA_MODEL)
359 .map(|s| s.as_str())
360 == Some(TABLE_DATA_MODEL_TRACE_V1);
361
362 let col_names = table
364 .table_info()
365 .meta
366 .field_column_names()
367 .map(|s| format!("\"{}\"", s))
368 .collect::<HashSet<String>>();
369
370 let df_context = create_df_context(query_engine)?;
371
372 let dataframe = df_context
373 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
374 .context(DataFusionSnafu)?;
375
376 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
377
378 let dataframe = filters
380 .into_iter()
381 .chain(tags.map_or(Ok(vec![]), |t| {
382 tags_filters(&dataframe, t, is_data_model_v1, &col_names)
383 })?)
384 .try_fold(dataframe, |df, expr| {
385 df.filter(expr).context(DataFusionSnafu)
386 })?;
387
388 let dataframe = if !distincts.is_empty() {
390 dataframe
391 .distinct_on(distincts.clone(), distincts, None)
392 .context(DataFusionSnafu)?
393 } else {
394 dataframe
395 };
396
397 let dataframe = if !sorts.is_empty() {
399 dataframe.sort(sorts).context(DataFusionSnafu)?
400 } else {
401 dataframe
402 };
403
404 let dataframe = if let Some(limit) = limit {
406 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
407 } else {
408 dataframe
409 };
410
411 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
413
414 let output = Output::new_with_stream(Box::pin(
415 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
416 ));
417
418 Ok(output)
419}
420
421async fn get_table(
422 ctx: QueryContextRef,
423 catalog_manager: &CatalogManagerRef,
424 table_name: &str,
425) -> ServerResult<TableRef> {
426 catalog_manager
427 .table(
428 ctx.current_catalog(),
429 &ctx.current_schema(),
430 table_name,
431 Some(&ctx),
432 )
433 .await
434 .context(CatalogSnafu)?
435 .with_context(|| TableNotFoundSnafu {
436 table: table_name,
437 catalog: ctx.current_catalog(),
438 schema: ctx.current_schema(),
439 })
440}
441
442async fn find_traces_rank_3(
443 table: TableRef,
444 query_engine: &QueryEngineRef,
445 filters: Vec<Expr>,
446 sorts: Vec<SortExpr>,
447) -> ServerResult<Output> {
448 let df_context = create_df_context(query_engine)?;
449
450 let dataframe = df_context
451 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
452 .context(DataFusionSnafu)?;
453
454 let dataframe = dataframe
455 .select(vec![wildcard()])
456 .context(DataFusionSnafu)?;
457
458 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
460 df.filter(expr).context(DataFusionSnafu)
461 })?;
462
463 let dataframe = if !sorts.is_empty() {
465 dataframe.sort(sorts).context(DataFusionSnafu)?
466 } else {
467 dataframe
468 };
469
470 let trace_id_col = vec![col(TRACE_ID_COLUMN)];
472 let timestamp_asc = vec![col(TIMESTAMP_COLUMN).sort(true, false)];
473
474 let dataframe = dataframe
475 .with_column(
476 KEY_RN,
477 row_number()
478 .partition_by(trace_id_col)
479 .order_by(timestamp_asc)
480 .build()
481 .context(DataFusionSnafu)?,
482 )
483 .context(DataFusionSnafu)?;
484
485 let dataframe = dataframe
486 .filter(col(KEY_RN).lt_eq(lit(3)))
487 .context(DataFusionSnafu)?;
488
489 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
491
492 let output = Output::new_with_stream(Box::pin(
493 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
494 ));
495
496 Ok(output)
497}
498
499fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
504 let df_context = SessionContext::new_with_state(
505 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
506 );
507
508 let udfs: Vec<FunctionRef> = vec![
510 Arc::new(JsonGetInt::default()),
511 Arc::new(JsonGetFloat::default()),
512 Arc::new(JsonGetBool::default()),
513 Arc::new(JsonGetString::default()),
514 ];
515
516 for udf in udfs {
517 df_context.register_udf(create_udf(udf));
518 }
519
520 Ok(df_context)
521}
522
523fn json_tag_filters(
524 dataframe: &DataFrame,
525 tags: HashMap<String, JsonValue>,
526) -> ServerResult<Vec<Expr>> {
527 let mut filters = vec![];
528
529 for (key, value) in tags.iter() {
531 if let JsonValue::String(value) = value {
532 filters.push(
533 dataframe
534 .registry()
535 .udf(JsonGetString::NAME)
536 .context(DataFusionSnafu)?
537 .call(vec![
538 col(SPAN_ATTRIBUTES_COLUMN),
539 lit(format!("[\"{}\"]", key)),
540 ])
541 .eq(lit(value)),
542 );
543 }
544 if let JsonValue::Number(value) = value {
545 if value.is_i64() {
546 filters.push(
547 dataframe
548 .registry()
549 .udf(JsonGetInt::NAME)
550 .context(DataFusionSnafu)?
551 .call(vec![
552 col(SPAN_ATTRIBUTES_COLUMN),
553 lit(format!("[\"{}\"]", key)),
554 ])
555 .eq(lit(value.as_i64().unwrap())),
556 );
557 }
558 if value.is_f64() {
559 filters.push(
560 dataframe
561 .registry()
562 .udf(JsonGetFloat::NAME)
563 .context(DataFusionSnafu)?
564 .call(vec![
565 col(SPAN_ATTRIBUTES_COLUMN),
566 lit(format!("[\"{}\"]", key)),
567 ])
568 .eq(lit(value.as_f64().unwrap())),
569 );
570 }
571 }
572 if let JsonValue::Bool(value) = value {
573 filters.push(
574 dataframe
575 .registry()
576 .udf(JsonGetBool::NAME)
577 .context(DataFusionSnafu)?
578 .call(vec![
579 col(SPAN_ATTRIBUTES_COLUMN),
580 lit(format!("[\"{}\"]", key)),
581 ])
582 .eq(lit(*value)),
583 );
584 }
585 }
586
587 Ok(filters)
588}
589
590#[inline]
593fn check_col_and_build_expr<F>(
594 span_key: String,
595 resource_key: String,
596 key: &str,
597 col_names: &HashSet<String>,
598 expr_builder: F,
599) -> Option<Expr>
600where
601 F: FnOnce(String) -> Expr,
602{
603 if col_names.contains(&span_key) {
604 return Some(expr_builder(span_key));
605 }
606 if col_names.contains(&resource_key) {
607 return Some(expr_builder(resource_key));
608 }
609 warn!("tag key {} not found in table columns", key);
610 None
611}
612
613fn flatten_tag_filters(
614 tags: HashMap<String, JsonValue>,
615 col_names: &HashSet<String>,
616) -> ServerResult<Vec<Expr>> {
617 let filters = tags
618 .into_iter()
619 .filter_map(|(key, value)| {
620 if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
621 return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
622 }
623
624 let span_key = format!("\"span_attributes.{}\"", key);
626 let resource_key = format!("\"resource_attributes.{}\"", key);
627 match value {
628 JsonValue::String(value) => {
629 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
630 col(k).eq(lit(value))
631 })
632 }
633 JsonValue::Number(value) => {
634 if value.is_f64() {
635 let value = value.as_f64().unwrap();
637 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
638 col(k).eq(lit(value))
639 })
640 } else {
641 let value = value.as_i64().unwrap();
642 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
643 col(k).eq(lit(value))
644 })
645 }
646 }
647 JsonValue::Bool(value) => {
648 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
649 col(k).eq(lit(value))
650 })
651 }
652 JsonValue::Null => {
653 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
654 col(k).is_null()
655 })
656 }
657 JsonValue::Array(_value) => None,
659 JsonValue::Object(_value) => None,
660 }
661 })
662 .collect();
663 Ok(filters)
664}
665
666fn tags_filters(
667 dataframe: &DataFrame,
668 tags: HashMap<String, JsonValue>,
669 is_data_model_v1: bool,
670 col_names: &HashSet<String>,
671) -> ServerResult<Vec<Expr>> {
672 if is_data_model_v1 {
673 flatten_tag_filters(tags, col_names)
674 } else {
675 json_tag_filters(dataframe, tags)
676 }
677}
678
679async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
681 if let OutputData::Stream(stream) = output.data {
682 let schema = stream.schema().clone();
683 let recordbatches = util::collect(stream)
684 .await
685 .context(CollectRecordbatchSnafu)?;
686
687 if !recordbatches.is_empty()
689 && schema.num_columns() == 1
690 && schema.contains_column(TRACE_ID_COLUMN)
691 {
692 let mut trace_ids = vec![];
693 for recordbatch in recordbatches {
694 recordbatch
695 .iter_column_as_string(0)
696 .flatten()
697 .for_each(|x| trace_ids.push(x));
698 }
699
700 return Ok(trace_ids);
701 }
702 }
703
704 Ok(vec![])
705}