1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21 TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion::functions_window::expr_fn::row_number;
36use datafusion_expr::select_expr::SelectExpr;
37use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_nano, wildcard};
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41 CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, TableNotFoundSnafu,
42};
43use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
44use servers::otlp::trace::{
45 DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
46 SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
47 TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
48};
49use servers::query_handler::JaegerQueryHandler;
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt};
52use table::TableRef;
53use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::instance::Instance;
57
58const DEFAULT_LIMIT: usize = 2000;
59const KEY_RN: &str = "greptime_rn";
60
61#[async_trait]
62impl JaegerQueryHandler for Instance {
63 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
64 Ok(query_trace_table(
66 ctx,
67 self,
68 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
69 vec![],
70 vec![],
71 None,
72 None,
73 vec![col(SERVICE_NAME_COLUMN)],
74 )
75 .await?)
76 }
77
78 async fn get_operations(
79 &self,
80 ctx: QueryContextRef,
81 service_name: &str,
82 span_kind: Option<&str>,
83 ) -> ServerResult<Output> {
84 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
85
86 if let Some(span_kind) = span_kind {
87 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
88 "{}{}",
89 SPAN_KIND_PREFIX,
90 span_kind.to_uppercase()
91 ))));
92 }
93
94 Ok(query_trace_table(
107 ctx,
108 self,
109 vec![
110 SelectExpr::from(col(SPAN_NAME_COLUMN)),
111 SelectExpr::from(col(SPAN_KIND_COLUMN)),
112 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
113 SelectExpr::from(col(TIMESTAMP_COLUMN)),
114 ],
115 filters,
116 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
118 None,
119 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
120 )
121 .await?)
122 }
123
124 async fn get_trace(
125 &self,
126 ctx: QueryContextRef,
127 trace_id: &str,
128 start_time: Option<i64>,
129 end_time: Option<i64>,
130 limit: Option<usize>,
131 ) -> ServerResult<Output> {
132 let selects = vec![wildcard()];
147
148 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
149
150 if let Some(start_time) = start_time {
151 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
152 }
153
154 if let Some(end_time) = end_time {
155 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
156 }
157
158 Ok(query_trace_table(
159 ctx,
160 self,
161 selects,
162 filters,
163 vec![col(TIMESTAMP_COLUMN).sort(false, false)], limit,
165 None,
166 vec![],
167 )
168 .await?)
169 }
170
171 async fn find_traces(
172 &self,
173 ctx: QueryContextRef,
174 query_params: QueryTraceParams,
175 ) -> ServerResult<Output> {
176 let mut filters = vec![];
177
178 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
180
181 if let Some(operation_name) = query_params.operation_name {
182 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
183 }
184
185 if let Some(start_time) = query_params.start_time {
186 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
187 }
188
189 if let Some(end_time) = query_params.end_time {
190 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
191 }
192
193 if let Some(min_duration) = query_params.min_duration {
194 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
195 }
196
197 if let Some(max_duration) = query_params.max_duration {
198 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
199 }
200
201 let output = query_trace_table(
218 ctx.clone(),
219 self,
220 vec![wildcard()],
221 filters,
222 vec![],
223 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
224 query_params.tags,
225 vec![col(TRACE_ID_COLUMN)],
226 )
227 .await?;
228
229 let mut filters = vec![
242 col(TRACE_ID_COLUMN).in_list(
243 trace_ids_from_output(output)
244 .await?
245 .iter()
246 .map(lit)
247 .collect::<Vec<Expr>>(),
248 false,
249 ),
250 ];
251
252 if let Some(start_time) = query_params.start_time {
253 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
254 }
255
256 if let Some(end_time) = query_params.end_time {
257 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
258 }
259
260 match query_params.user_agent {
261 TraceUserAgent::Grafana => {
262 let table_name = ctx
266 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
267 .unwrap_or(TRACE_TABLE_NAME);
268
269 let table = get_table(ctx.clone(), self.catalog_manager(), table_name).await?;
270
271 Ok(find_traces_rank_3(
272 table,
273 self.query_engine(),
274 filters,
275 vec![col(TIMESTAMP_COLUMN).sort(false, false)], )
277 .await?)
278 }
279 _ => {
280 Ok(query_trace_table(
282 ctx,
283 self,
284 vec![wildcard()],
285 filters,
286 vec![col(TIMESTAMP_COLUMN).sort(false, false)], None,
288 None,
289 vec![],
290 )
291 .await?)
292 }
293 }
294 }
295}
296
297#[allow(clippy::too_many_arguments)]
298async fn query_trace_table(
299 ctx: QueryContextRef,
300 instance: &Instance,
301 selects: Vec<SelectExpr>,
302 filters: Vec<Expr>,
303 sorts: Vec<SortExpr>,
304 limit: Option<usize>,
305 tags: Option<HashMap<String, JsonValue>>,
306 distincts: Vec<Expr>,
307) -> ServerResult<Output> {
308 let trace_table_name = ctx
309 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
310 .unwrap_or(TRACE_TABLE_NAME);
311
312 let table_name = {
315 if match selects.as_slice() {
316 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
317 _ => false,
318 } {
319 &trace_services_table_name(trace_table_name)
320 } else if !distincts.is_empty()
321 && distincts.contains(&col(SPAN_NAME_COLUMN))
322 && distincts.contains(&col(SPAN_KIND_COLUMN))
323 {
324 &trace_operations_table_name(trace_table_name)
325 } else {
326 trace_table_name
327 }
328 };
329
330 let table = instance
331 .catalog_manager()
332 .table(
333 ctx.current_catalog(),
334 &ctx.current_schema(),
335 table_name,
336 Some(&ctx),
337 )
338 .await?
339 .with_context(|| TableNotFoundSnafu {
340 table: table_name,
341 catalog: ctx.current_catalog(),
342 schema: ctx.current_schema(),
343 })?;
344
345 let is_data_model_v1 = table
346 .clone()
347 .table_info()
348 .meta
349 .options
350 .extra_options
351 .get(TABLE_DATA_MODEL)
352 .map(|s| s.as_str())
353 == Some(TABLE_DATA_MODEL_TRACE_V1);
354
355 let col_names = table
357 .table_info()
358 .meta
359 .field_column_names()
360 .map(|s| format!("\"{}\"", s))
361 .collect::<HashSet<String>>();
362
363 let df_context = create_df_context(instance.query_engine())?;
364
365 let dataframe = df_context
366 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
367 .context(DataFusionSnafu)?;
368
369 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
370
371 let dataframe = filters
373 .into_iter()
374 .chain(tags.map_or(Ok(vec![]), |t| {
375 tags_filters(&dataframe, t, is_data_model_v1, &col_names)
376 })?)
377 .try_fold(dataframe, |df, expr| {
378 df.filter(expr).context(DataFusionSnafu)
379 })?;
380
381 let dataframe = if !distincts.is_empty() {
383 dataframe
384 .distinct_on(distincts.clone(), distincts, None)
385 .context(DataFusionSnafu)?
386 } else {
387 dataframe
388 };
389
390 let dataframe = if !sorts.is_empty() {
392 dataframe.sort(sorts).context(DataFusionSnafu)?
393 } else {
394 dataframe
395 };
396
397 let dataframe = if let Some(limit) = limit {
399 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
400 } else {
401 dataframe
402 };
403
404 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
406
407 let output = Output::new_with_stream(Box::pin(
408 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
409 ));
410
411 Ok(output)
412}
413
414async fn get_table(
415 ctx: QueryContextRef,
416 catalog_manager: &CatalogManagerRef,
417 table_name: &str,
418) -> ServerResult<TableRef> {
419 catalog_manager
420 .table(
421 ctx.current_catalog(),
422 &ctx.current_schema(),
423 table_name,
424 Some(&ctx),
425 )
426 .await?
427 .with_context(|| TableNotFoundSnafu {
428 table: table_name,
429 catalog: ctx.current_catalog(),
430 schema: ctx.current_schema(),
431 })
432}
433
434async fn find_traces_rank_3(
435 table: TableRef,
436 query_engine: &QueryEngineRef,
437 filters: Vec<Expr>,
438 sorts: Vec<SortExpr>,
439) -> ServerResult<Output> {
440 let df_context = create_df_context(query_engine)?;
441
442 let dataframe = df_context
443 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
444 .context(DataFusionSnafu)?;
445
446 let dataframe = dataframe
447 .select(vec![wildcard()])
448 .context(DataFusionSnafu)?;
449
450 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
452 df.filter(expr).context(DataFusionSnafu)
453 })?;
454
455 let dataframe = if !sorts.is_empty() {
457 dataframe.sort(sorts).context(DataFusionSnafu)?
458 } else {
459 dataframe
460 };
461
462 let trace_id_col = vec![col(TRACE_ID_COLUMN)];
464 let timestamp_asc = vec![col(TIMESTAMP_COLUMN).sort(true, false)];
465
466 let dataframe = dataframe
467 .with_column(
468 KEY_RN,
469 row_number()
470 .partition_by(trace_id_col)
471 .order_by(timestamp_asc)
472 .build()
473 .context(DataFusionSnafu)?,
474 )
475 .context(DataFusionSnafu)?;
476
477 let dataframe = dataframe
478 .filter(col(KEY_RN).lt_eq(lit(3)))
479 .context(DataFusionSnafu)?;
480
481 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
483
484 let output = Output::new_with_stream(Box::pin(
485 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
486 ));
487
488 Ok(output)
489}
490
491fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
496 let df_context = SessionContext::new_with_state(
497 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
498 );
499
500 let udfs: Vec<FunctionRef> = vec![
502 Arc::new(JsonGetInt::default()),
503 Arc::new(JsonGetFloat::default()),
504 Arc::new(JsonGetBool::default()),
505 Arc::new(JsonGetString::default()),
506 ];
507
508 for udf in udfs {
509 df_context.register_udf(create_udf(udf));
510 }
511
512 Ok(df_context)
513}
514
515fn json_tag_filters(
516 dataframe: &DataFrame,
517 tags: HashMap<String, JsonValue>,
518) -> ServerResult<Vec<Expr>> {
519 let mut filters = vec![];
520
521 for (key, value) in tags.iter() {
523 if let JsonValue::String(value) = value {
524 filters.push(
525 dataframe
526 .registry()
527 .udf(JsonGetString::NAME)
528 .context(DataFusionSnafu)?
529 .call(vec![
530 col(SPAN_ATTRIBUTES_COLUMN),
531 lit(format!("[\"{}\"]", key)),
532 ])
533 .eq(lit(value)),
534 );
535 }
536 if let JsonValue::Number(value) = value {
537 if value.is_i64() {
538 filters.push(
539 dataframe
540 .registry()
541 .udf(JsonGetInt::NAME)
542 .context(DataFusionSnafu)?
543 .call(vec![
544 col(SPAN_ATTRIBUTES_COLUMN),
545 lit(format!("[\"{}\"]", key)),
546 ])
547 .eq(lit(value.as_i64().unwrap())),
548 );
549 }
550 if value.is_f64() {
551 filters.push(
552 dataframe
553 .registry()
554 .udf(JsonGetFloat::NAME)
555 .context(DataFusionSnafu)?
556 .call(vec![
557 col(SPAN_ATTRIBUTES_COLUMN),
558 lit(format!("[\"{}\"]", key)),
559 ])
560 .eq(lit(value.as_f64().unwrap())),
561 );
562 }
563 }
564 if let JsonValue::Bool(value) = value {
565 filters.push(
566 dataframe
567 .registry()
568 .udf(JsonGetBool::NAME)
569 .context(DataFusionSnafu)?
570 .call(vec![
571 col(SPAN_ATTRIBUTES_COLUMN),
572 lit(format!("[\"{}\"]", key)),
573 ])
574 .eq(lit(*value)),
575 );
576 }
577 }
578
579 Ok(filters)
580}
581
582#[inline]
585fn check_col_and_build_expr<F>(
586 span_key: String,
587 resource_key: String,
588 key: &str,
589 col_names: &HashSet<String>,
590 expr_builder: F,
591) -> Option<Expr>
592where
593 F: FnOnce(String) -> Expr,
594{
595 if col_names.contains(&span_key) {
596 return Some(expr_builder(span_key));
597 }
598 if col_names.contains(&resource_key) {
599 return Some(expr_builder(resource_key));
600 }
601 warn!("tag key {} not found in table columns", key);
602 None
603}
604
605fn flatten_tag_filters(
606 tags: HashMap<String, JsonValue>,
607 col_names: &HashSet<String>,
608) -> ServerResult<Vec<Expr>> {
609 let filters = tags
610 .into_iter()
611 .filter_map(|(key, value)| {
612 if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
613 return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
614 }
615
616 let span_key = format!("\"span_attributes.{}\"", key);
618 let resource_key = format!("\"resource_attributes.{}\"", key);
619 match value {
620 JsonValue::String(value) => {
621 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
622 col(k).eq(lit(value))
623 })
624 }
625 JsonValue::Number(value) => {
626 if value.is_f64() {
627 let value = value.as_f64().unwrap();
629 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
630 col(k).eq(lit(value))
631 })
632 } else {
633 let value = value.as_i64().unwrap();
634 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
635 col(k).eq(lit(value))
636 })
637 }
638 }
639 JsonValue::Bool(value) => {
640 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
641 col(k).eq(lit(value))
642 })
643 }
644 JsonValue::Null => {
645 check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
646 col(k).is_null()
647 })
648 }
649 JsonValue::Array(_value) => None,
651 JsonValue::Object(_value) => None,
652 }
653 })
654 .collect();
655 Ok(filters)
656}
657
658fn tags_filters(
659 dataframe: &DataFrame,
660 tags: HashMap<String, JsonValue>,
661 is_data_model_v1: bool,
662 col_names: &HashSet<String>,
663) -> ServerResult<Vec<Expr>> {
664 if is_data_model_v1 {
665 flatten_tag_filters(tags, col_names)
666 } else {
667 json_tag_filters(dataframe, tags)
668 }
669}
670
671async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
673 if let OutputData::Stream(stream) = output.data {
674 let schema = stream.schema().clone();
675 let recordbatches = util::collect(stream)
676 .await
677 .context(CollectRecordbatchSnafu)?;
678
679 if !recordbatches.is_empty()
681 && schema.num_columns() == 1
682 && schema.contains_column(TRACE_ID_COLUMN)
683 {
684 let mut trace_ids = vec![];
685 for recordbatch in recordbatches {
686 recordbatch
687 .iter_column_as_string(0)
688 .flatten()
689 .for_each(|x| trace_ids.push(x));
690 }
691
692 return Ok(trace_ids);
693 }
694 }
695
696 Ok(vec![])
697}