Skip to main content

frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21    TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion::functions_window::expr_fn::row_number;
36use datafusion_expr::select_expr::SelectExpr;
37use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_nano, wildcard};
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41    CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, TableNotFoundSnafu,
42};
43use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
44use servers::otlp::trace::{
45    DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
46    SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
47    TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
48};
49use servers::query_handler::JaegerQueryHandler;
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt};
52use table::TableRef;
53use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::instance::Instance;
57
58const DEFAULT_LIMIT: usize = 2000;
59const KEY_RN: &str = "greptime_rn";
60
61#[async_trait]
62impl JaegerQueryHandler for Instance {
63    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
64        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
65        Ok(query_trace_table(
66            ctx,
67            self,
68            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
69            vec![],
70            vec![],
71            None,
72            None,
73            vec![col(SERVICE_NAME_COLUMN)],
74        )
75        .await?)
76    }
77
78    async fn get_operations(
79        &self,
80        ctx: QueryContextRef,
81        service_name: &str,
82        span_kind: Option<&str>,
83    ) -> ServerResult<Output> {
84        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
85
86        if let Some(span_kind) = span_kind {
87            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
88                "{}{}",
89                SPAN_KIND_PREFIX,
90                span_kind.to_uppercase()
91            ))));
92        }
93
94        // It's equivalent to the following SQL query:
95        //
96        // ```
97        // SELECT DISTINCT span_name, span_kind
98        // FROM
99        //   {db}.{trace_table}
100        // WHERE
101        //   service_name = '{service_name}' AND
102        //   span_kind = '{span_kind}'
103        // ORDER BY
104        //   span_name ASC
105        // ```.
106        Ok(query_trace_table(
107            ctx,
108            self,
109            vec![
110                SelectExpr::from(col(SPAN_NAME_COLUMN)),
111                SelectExpr::from(col(SPAN_KIND_COLUMN)),
112                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
113                SelectExpr::from(col(TIMESTAMP_COLUMN)),
114            ],
115            filters,
116            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
117            Some(DEFAULT_LIMIT),
118            None,
119            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
120        )
121        .await?)
122    }
123
124    async fn get_trace(
125        &self,
126        ctx: QueryContextRef,
127        trace_id: &str,
128        start_time: Option<i64>,
129        end_time: Option<i64>,
130        limit: Option<usize>,
131    ) -> ServerResult<Output> {
132        // It's equivalent to the following SQL query:
133        //
134        // ```
135        // SELECT
136        //   *
137        // FROM
138        //   {db}.{trace_table}
139        // WHERE
140        //   trace_id = '{trace_id}' AND
141        //   timestamp >= {start_time} AND
142        //   timestamp <= {end_time}
143        // ORDER BY
144        //   timestamp DESC
145        // ```.
146        let selects = vec![wildcard()];
147
148        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
149
150        if let Some(start_time) = start_time {
151            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
152        }
153
154        if let Some(end_time) = end_time {
155            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
156        }
157
158        Ok(query_trace_table(
159            ctx,
160            self,
161            selects,
162            filters,
163            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
164            limit,
165            None,
166            vec![],
167        )
168        .await?)
169    }
170
171    async fn find_traces(
172        &self,
173        ctx: QueryContextRef,
174        query_params: QueryTraceParams,
175    ) -> ServerResult<Output> {
176        let mut filters = vec![];
177
178        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
179        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
180
181        if let Some(operation_name) = query_params.operation_name {
182            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
183        }
184
185        if let Some(start_time) = query_params.start_time {
186            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
187        }
188
189        if let Some(end_time) = query_params.end_time {
190            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
191        }
192
193        if let Some(min_duration) = query_params.min_duration {
194            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
195        }
196
197        if let Some(max_duration) = query_params.max_duration {
198            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
199        }
200
201        // Get all distinct trace ids that match the filters.
202        // It's equivalent to the following SQL query:
203        //
204        // ```
205        // SELECT DISTINCT trace_id
206        // FROM
207        //   {db}.{trace_table}
208        // WHERE
209        //   service_name = '{service_name}' AND
210        //   operation_name = '{operation_name}' AND
211        //   timestamp >= {start_time} AND
212        //   timestamp <= {end_time} AND
213        //   duration >= {min_duration} AND
214        //   duration <= {max_duration}
215        // LIMIT {limit}
216        // ```.
217        let output = query_trace_table(
218            ctx.clone(),
219            self,
220            vec![wildcard()],
221            filters,
222            vec![],
223            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
224            query_params.tags,
225            vec![col(TRACE_ID_COLUMN)],
226        )
227        .await?;
228
229        // Get all traces that match the trace ids from the previous query.
230        // It's equivalent to the following SQL query:
231        //
232        // ```
233        // SELECT *
234        // FROM
235        //   {db}.{trace_table}
236        // WHERE
237        //   trace_id IN ({trace_ids}) AND
238        //   timestamp >= {start_time} AND
239        //   timestamp <= {end_time}
240        // ```
241        let mut filters = vec![
242            col(TRACE_ID_COLUMN).in_list(
243                trace_ids_from_output(output)
244                    .await?
245                    .iter()
246                    .map(lit)
247                    .collect::<Vec<Expr>>(),
248                false,
249            ),
250        ];
251
252        if let Some(start_time) = query_params.start_time {
253            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
254        }
255
256        if let Some(end_time) = query_params.end_time {
257            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
258        }
259
260        match query_params.user_agent {
261            TraceUserAgent::Grafana => {
262                // grafana only use trace id and timestamp
263                // clicking the trace id will invoke the query trace api
264                // so we only need to return 1 span for each trace
265                let table_name = ctx
266                    .extension(JAEGER_QUERY_TABLE_NAME_KEY)
267                    .unwrap_or(TRACE_TABLE_NAME);
268
269                let table = get_table(ctx.clone(), self.catalog_manager(), table_name).await?;
270
271                Ok(find_traces_rank_3(
272                    table,
273                    self.query_engine(),
274                    filters,
275                    vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
276                )
277                .await?)
278            }
279            _ => {
280                // query all spans
281                Ok(query_trace_table(
282                    ctx,
283                    self,
284                    vec![wildcard()],
285                    filters,
286                    vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
287                    None,
288                    None,
289                    vec![],
290                )
291                .await?)
292            }
293        }
294    }
295}
296
297#[allow(clippy::too_many_arguments)]
298async fn query_trace_table(
299    ctx: QueryContextRef,
300    instance: &Instance,
301    selects: Vec<SelectExpr>,
302    filters: Vec<Expr>,
303    sorts: Vec<SortExpr>,
304    limit: Option<usize>,
305    tags: Option<HashMap<String, JsonValue>>,
306    distincts: Vec<Expr>,
307) -> ServerResult<Output> {
308    let trace_table_name = ctx
309        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
310        .unwrap_or(TRACE_TABLE_NAME);
311
312    // If only select services, use the trace services table.
313    // If querying operations (distinct by span_name and span_kind), use the trace operations table.
314    let table_name = {
315        if match selects.as_slice() {
316            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
317            _ => false,
318        } {
319            &trace_services_table_name(trace_table_name)
320        } else if !distincts.is_empty()
321            && distincts.contains(&col(SPAN_NAME_COLUMN))
322            && distincts.contains(&col(SPAN_KIND_COLUMN))
323        {
324            &trace_operations_table_name(trace_table_name)
325        } else {
326            trace_table_name
327        }
328    };
329
330    let table = instance
331        .catalog_manager()
332        .table(
333            ctx.current_catalog(),
334            &ctx.current_schema(),
335            table_name,
336            Some(&ctx),
337        )
338        .await?
339        .with_context(|| TableNotFoundSnafu {
340            table: table_name,
341            catalog: ctx.current_catalog(),
342            schema: ctx.current_schema(),
343        })?;
344
345    let is_data_model_v1 = table
346        .clone()
347        .table_info()
348        .meta
349        .options
350        .extra_options
351        .get(TABLE_DATA_MODEL)
352        .map(|s| s.as_str())
353        == Some(TABLE_DATA_MODEL_TRACE_V1);
354
355    // collect to set
356    let col_names = table
357        .table_info()
358        .meta
359        .field_column_names()
360        .map(|s| format!("\"{}\"", s))
361        .collect::<HashSet<String>>();
362
363    let df_context = create_df_context(instance.query_engine())?;
364
365    let dataframe = df_context
366        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
367        .context(DataFusionSnafu)?;
368
369    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
370
371    // Apply all filters.
372    let dataframe = filters
373        .into_iter()
374        .chain(tags.map_or(Ok(vec![]), |t| {
375            tags_filters(&dataframe, t, is_data_model_v1, &col_names)
376        })?)
377        .try_fold(dataframe, |df, expr| {
378            df.filter(expr).context(DataFusionSnafu)
379        })?;
380
381    // Apply the distinct if needed.
382    let dataframe = if !distincts.is_empty() {
383        dataframe
384            .distinct_on(distincts.clone(), distincts, None)
385            .context(DataFusionSnafu)?
386    } else {
387        dataframe
388    };
389
390    // Apply the sorts if needed.
391    let dataframe = if !sorts.is_empty() {
392        dataframe.sort(sorts).context(DataFusionSnafu)?
393    } else {
394        dataframe
395    };
396
397    // Apply the limit if needed.
398    let dataframe = if let Some(limit) = limit {
399        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
400    } else {
401        dataframe
402    };
403
404    // Execute the query and collect the result.
405    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
406
407    let output = Output::new_with_stream(Box::pin(
408        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
409    ));
410
411    Ok(output)
412}
413
414async fn get_table(
415    ctx: QueryContextRef,
416    catalog_manager: &CatalogManagerRef,
417    table_name: &str,
418) -> ServerResult<TableRef> {
419    catalog_manager
420        .table(
421            ctx.current_catalog(),
422            &ctx.current_schema(),
423            table_name,
424            Some(&ctx),
425        )
426        .await?
427        .with_context(|| TableNotFoundSnafu {
428            table: table_name,
429            catalog: ctx.current_catalog(),
430            schema: ctx.current_schema(),
431        })
432}
433
434async fn find_traces_rank_3(
435    table: TableRef,
436    query_engine: &QueryEngineRef,
437    filters: Vec<Expr>,
438    sorts: Vec<SortExpr>,
439) -> ServerResult<Output> {
440    let df_context = create_df_context(query_engine)?;
441
442    let dataframe = df_context
443        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
444        .context(DataFusionSnafu)?;
445
446    let dataframe = dataframe
447        .select(vec![wildcard()])
448        .context(DataFusionSnafu)?;
449
450    // Apply all filters.
451    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
452        df.filter(expr).context(DataFusionSnafu)
453    })?;
454
455    // Apply the sorts if needed.
456    let dataframe = if !sorts.is_empty() {
457        dataframe.sort(sorts).context(DataFusionSnafu)?
458    } else {
459        dataframe
460    };
461
462    // create rank column, for each trace, get the earliest 3 spans
463    let trace_id_col = vec![col(TRACE_ID_COLUMN)];
464    let timestamp_asc = vec![col(TIMESTAMP_COLUMN).sort(true, false)];
465
466    let dataframe = dataframe
467        .with_column(
468            KEY_RN,
469            row_number()
470                .partition_by(trace_id_col)
471                .order_by(timestamp_asc)
472                .build()
473                .context(DataFusionSnafu)?,
474        )
475        .context(DataFusionSnafu)?;
476
477    let dataframe = dataframe
478        .filter(col(KEY_RN).lt_eq(lit(3)))
479        .context(DataFusionSnafu)?;
480
481    // Execute the query and collect the result.
482    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
483
484    let output = Output::new_with_stream(Box::pin(
485        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
486    ));
487
488    Ok(output)
489}
490
491// The current implementation registers UDFs during the planning stage, which makes it difficult
492// to utilize them through DataFrame APIs. To address this limitation, we create a new session
493// context and register the required UDFs, allowing them to be decoupled from the global context.
494// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
495fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
496    let df_context = SessionContext::new_with_state(
497        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
498    );
499
500    // The following JSON UDFs will be used for tags filters on v0 data model.
501    let udfs: Vec<FunctionRef> = vec![
502        Arc::new(JsonGetInt::default()),
503        Arc::new(JsonGetFloat::default()),
504        Arc::new(JsonGetBool::default()),
505        Arc::new(JsonGetString::default()),
506    ];
507
508    for udf in udfs {
509        df_context.register_udf(create_udf(udf));
510    }
511
512    Ok(df_context)
513}
514
515fn json_tag_filters(
516    dataframe: &DataFrame,
517    tags: HashMap<String, JsonValue>,
518) -> ServerResult<Vec<Expr>> {
519    let mut filters = vec![];
520
521    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
522    for (key, value) in tags.iter() {
523        if let JsonValue::String(value) = value {
524            filters.push(
525                dataframe
526                    .registry()
527                    .udf(JsonGetString::NAME)
528                    .context(DataFusionSnafu)?
529                    .call(vec![
530                        col(SPAN_ATTRIBUTES_COLUMN),
531                        lit(format!("[\"{}\"]", key)),
532                    ])
533                    .eq(lit(value)),
534            );
535        }
536        if let JsonValue::Number(value) = value {
537            if value.is_i64() {
538                filters.push(
539                    dataframe
540                        .registry()
541                        .udf(JsonGetInt::NAME)
542                        .context(DataFusionSnafu)?
543                        .call(vec![
544                            col(SPAN_ATTRIBUTES_COLUMN),
545                            lit(format!("[\"{}\"]", key)),
546                        ])
547                        .eq(lit(value.as_i64().unwrap())),
548                );
549            }
550            if value.is_f64() {
551                filters.push(
552                    dataframe
553                        .registry()
554                        .udf(JsonGetFloat::NAME)
555                        .context(DataFusionSnafu)?
556                        .call(vec![
557                            col(SPAN_ATTRIBUTES_COLUMN),
558                            lit(format!("[\"{}\"]", key)),
559                        ])
560                        .eq(lit(value.as_f64().unwrap())),
561                );
562            }
563        }
564        if let JsonValue::Bool(value) = value {
565            filters.push(
566                dataframe
567                    .registry()
568                    .udf(JsonGetBool::NAME)
569                    .context(DataFusionSnafu)?
570                    .call(vec![
571                        col(SPAN_ATTRIBUTES_COLUMN),
572                        lit(format!("[\"{}\"]", key)),
573                    ])
574                    .eq(lit(*value)),
575            );
576        }
577    }
578
579    Ok(filters)
580}
581
582/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
583/// If neither exists, logs a warning and returns None.
584#[inline]
585fn check_col_and_build_expr<F>(
586    span_key: String,
587    resource_key: String,
588    key: &str,
589    col_names: &HashSet<String>,
590    expr_builder: F,
591) -> Option<Expr>
592where
593    F: FnOnce(String) -> Expr,
594{
595    if col_names.contains(&span_key) {
596        return Some(expr_builder(span_key));
597    }
598    if col_names.contains(&resource_key) {
599        return Some(expr_builder(resource_key));
600    }
601    warn!("tag key {} not found in table columns", key);
602    None
603}
604
605fn flatten_tag_filters(
606    tags: HashMap<String, JsonValue>,
607    col_names: &HashSet<String>,
608) -> ServerResult<Vec<Expr>> {
609    let filters = tags
610        .into_iter()
611        .filter_map(|(key, value)| {
612            if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
613                return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
614            }
615
616            // TODO(shuiyisong): add more precise mapping from key to col name
617            let span_key = format!("\"span_attributes.{}\"", key);
618            let resource_key = format!("\"resource_attributes.{}\"", key);
619            match value {
620                JsonValue::String(value) => {
621                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
622                        col(k).eq(lit(value))
623                    })
624                }
625                JsonValue::Number(value) => {
626                    if value.is_f64() {
627                        // safe to unwrap as checked previously
628                        let value = value.as_f64().unwrap();
629                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
630                            col(k).eq(lit(value))
631                        })
632                    } else {
633                        let value = value.as_i64().unwrap();
634                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
635                            col(k).eq(lit(value))
636                        })
637                    }
638                }
639                JsonValue::Bool(value) => {
640                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
641                        col(k).eq(lit(value))
642                    })
643                }
644                JsonValue::Null => {
645                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
646                        col(k).is_null()
647                    })
648                }
649                // not supported at the moment
650                JsonValue::Array(_value) => None,
651                JsonValue::Object(_value) => None,
652            }
653        })
654        .collect();
655    Ok(filters)
656}
657
658fn tags_filters(
659    dataframe: &DataFrame,
660    tags: HashMap<String, JsonValue>,
661    is_data_model_v1: bool,
662    col_names: &HashSet<String>,
663) -> ServerResult<Vec<Expr>> {
664    if is_data_model_v1 {
665        flatten_tag_filters(tags, col_names)
666    } else {
667        json_tag_filters(dataframe, tags)
668    }
669}
670
671// Get trace ids from the output in recordbatches.
672async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
673    if let OutputData::Stream(stream) = output.data {
674        let schema = stream.schema().clone();
675        let recordbatches = util::collect(stream)
676            .await
677            .context(CollectRecordbatchSnafu)?;
678
679        // Only contains `trace_id` column in string type.
680        if !recordbatches.is_empty()
681            && schema.num_columns() == 1
682            && schema.contains_column(TRACE_ID_COLUMN)
683        {
684            let mut trace_ids = vec![];
685            for recordbatch in recordbatches {
686                recordbatch
687                    .iter_column_as_string(0)
688                    .flatten()
689                    .for_each(|x| trace_ids.push(x));
690            }
691
692            return Ok(trace_ids);
693        }
694    }
695
696    Ok(vec![])
697}