frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21    TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion::functions_window::expr_fn::row_number;
36use datafusion_expr::select_expr::SelectExpr;
37use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_nano, wildcard};
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42    TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
45use servers::otlp::trace::{
46    DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47    SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48    TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::TableRef;
54use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
55use table::table::adapter::DfTableProviderAdapter;
56
57use crate::instance::Instance;
58
59const DEFAULT_LIMIT: usize = 2000;
60const KEY_RN: &str = "greptime_rn";
61
62#[async_trait]
63impl JaegerQueryHandler for Instance {
64    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
65        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
66        Ok(query_trace_table(
67            ctx,
68            self,
69            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
70            vec![],
71            vec![],
72            None,
73            None,
74            vec![col(SERVICE_NAME_COLUMN)],
75        )
76        .await?)
77    }
78
79    async fn get_operations(
80        &self,
81        ctx: QueryContextRef,
82        service_name: &str,
83        span_kind: Option<&str>,
84    ) -> ServerResult<Output> {
85        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
86
87        if let Some(span_kind) = span_kind {
88            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
89                "{}{}",
90                SPAN_KIND_PREFIX,
91                span_kind.to_uppercase()
92            ))));
93        }
94
95        // It's equivalent to the following SQL query:
96        //
97        // ```
98        // SELECT DISTINCT span_name, span_kind
99        // FROM
100        //   {db}.{trace_table}
101        // WHERE
102        //   service_name = '{service_name}' AND
103        //   span_kind = '{span_kind}'
104        // ORDER BY
105        //   span_name ASC
106        // ```.
107        Ok(query_trace_table(
108            ctx,
109            self,
110            vec![
111                SelectExpr::from(col(SPAN_NAME_COLUMN)),
112                SelectExpr::from(col(SPAN_KIND_COLUMN)),
113                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
114                SelectExpr::from(col(TIMESTAMP_COLUMN)),
115            ],
116            filters,
117            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
118            Some(DEFAULT_LIMIT),
119            None,
120            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
121        )
122        .await?)
123    }
124
125    async fn get_trace(
126        &self,
127        ctx: QueryContextRef,
128        trace_id: &str,
129        start_time: Option<i64>,
130        end_time: Option<i64>,
131        limit: Option<usize>,
132    ) -> ServerResult<Output> {
133        // It's equivalent to the following SQL query:
134        //
135        // ```
136        // SELECT
137        //   *
138        // FROM
139        //   {db}.{trace_table}
140        // WHERE
141        //   trace_id = '{trace_id}' AND
142        //   timestamp >= {start_time} AND
143        //   timestamp <= {end_time}
144        // ORDER BY
145        //   timestamp DESC
146        // ```.
147        let selects = vec![wildcard()];
148
149        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
150
151        if let Some(start_time) = start_time {
152            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
153        }
154
155        if let Some(end_time) = end_time {
156            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
157        }
158
159        Ok(query_trace_table(
160            ctx,
161            self,
162            selects,
163            filters,
164            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
165            limit,
166            None,
167            vec![],
168        )
169        .await?)
170    }
171
172    async fn find_traces(
173        &self,
174        ctx: QueryContextRef,
175        query_params: QueryTraceParams,
176    ) -> ServerResult<Output> {
177        let mut filters = vec![];
178
179        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
180        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
181
182        if let Some(operation_name) = query_params.operation_name {
183            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
184        }
185
186        if let Some(start_time) = query_params.start_time {
187            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
188        }
189
190        if let Some(end_time) = query_params.end_time {
191            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
192        }
193
194        if let Some(min_duration) = query_params.min_duration {
195            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
196        }
197
198        if let Some(max_duration) = query_params.max_duration {
199            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
200        }
201
202        // Get all distinct trace ids that match the filters.
203        // It's equivalent to the following SQL query:
204        //
205        // ```
206        // SELECT DISTINCT trace_id
207        // FROM
208        //   {db}.{trace_table}
209        // WHERE
210        //   service_name = '{service_name}' AND
211        //   operation_name = '{operation_name}' AND
212        //   timestamp >= {start_time} AND
213        //   timestamp <= {end_time} AND
214        //   duration >= {min_duration} AND
215        //   duration <= {max_duration}
216        // LIMIT {limit}
217        // ```.
218        let output = query_trace_table(
219            ctx.clone(),
220            self,
221            vec![wildcard()],
222            filters,
223            vec![],
224            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
225            query_params.tags,
226            vec![col(TRACE_ID_COLUMN)],
227        )
228        .await?;
229
230        // Get all traces that match the trace ids from the previous query.
231        // It's equivalent to the following SQL query:
232        //
233        // ```
234        // SELECT *
235        // FROM
236        //   {db}.{trace_table}
237        // WHERE
238        //   trace_id IN ({trace_ids}) AND
239        //   timestamp >= {start_time} AND
240        //   timestamp <= {end_time}
241        // ```
242        let mut filters = vec![
243            col(TRACE_ID_COLUMN).in_list(
244                trace_ids_from_output(output)
245                    .await?
246                    .iter()
247                    .map(lit)
248                    .collect::<Vec<Expr>>(),
249                false,
250            ),
251        ];
252
253        if let Some(start_time) = query_params.start_time {
254            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
255        }
256
257        if let Some(end_time) = query_params.end_time {
258            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
259        }
260
261        match query_params.user_agent {
262            TraceUserAgent::Grafana => {
263                // grafana only use trace id and timestamp
264                // clicking the trace id will invoke the query trace api
265                // so we only need to return 1 span for each trace
266                let table_name = ctx
267                    .extension(JAEGER_QUERY_TABLE_NAME_KEY)
268                    .unwrap_or(TRACE_TABLE_NAME);
269
270                let table = get_table(ctx.clone(), self.catalog_manager(), table_name).await?;
271
272                Ok(find_traces_rank_3(
273                    table,
274                    self.query_engine(),
275                    filters,
276                    vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
277                )
278                .await?)
279            }
280            _ => {
281                // query all spans
282                Ok(query_trace_table(
283                    ctx,
284                    self,
285                    vec![wildcard()],
286                    filters,
287                    vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
288                    None,
289                    None,
290                    vec![],
291                )
292                .await?)
293            }
294        }
295    }
296}
297
298#[allow(clippy::too_many_arguments)]
299async fn query_trace_table(
300    ctx: QueryContextRef,
301    instance: &Instance,
302    selects: Vec<SelectExpr>,
303    filters: Vec<Expr>,
304    sorts: Vec<SortExpr>,
305    limit: Option<usize>,
306    tags: Option<HashMap<String, JsonValue>>,
307    distincts: Vec<Expr>,
308) -> ServerResult<Output> {
309    let trace_table_name = ctx
310        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
311        .unwrap_or(TRACE_TABLE_NAME);
312
313    // If only select services, use the trace services table.
314    // If querying operations (distinct by span_name and span_kind), use the trace operations table.
315    let table_name = {
316        if match selects.as_slice() {
317            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
318            _ => false,
319        } {
320            &trace_services_table_name(trace_table_name)
321        } else if !distincts.is_empty()
322            && distincts.contains(&col(SPAN_NAME_COLUMN))
323            && distincts.contains(&col(SPAN_KIND_COLUMN))
324        {
325            &trace_operations_table_name(trace_table_name)
326        } else {
327            trace_table_name
328        }
329    };
330
331    let table = instance
332        .catalog_manager()
333        .table(
334            ctx.current_catalog(),
335            &ctx.current_schema(),
336            table_name,
337            Some(&ctx),
338        )
339        .await
340        .context(CatalogSnafu)?
341        .with_context(|| TableNotFoundSnafu {
342            table: table_name,
343            catalog: ctx.current_catalog(),
344            schema: ctx.current_schema(),
345        })?;
346
347    let is_data_model_v1 = table
348        .clone()
349        .table_info()
350        .meta
351        .options
352        .extra_options
353        .get(TABLE_DATA_MODEL)
354        .map(|s| s.as_str())
355        == Some(TABLE_DATA_MODEL_TRACE_V1);
356
357    // collect to set
358    let col_names = table
359        .table_info()
360        .meta
361        .field_column_names()
362        .map(|s| format!("\"{}\"", s))
363        .collect::<HashSet<String>>();
364
365    let df_context = create_df_context(instance.query_engine())?;
366
367    let dataframe = df_context
368        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
369        .context(DataFusionSnafu)?;
370
371    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
372
373    // Apply all filters.
374    let dataframe = filters
375        .into_iter()
376        .chain(tags.map_or(Ok(vec![]), |t| {
377            tags_filters(&dataframe, t, is_data_model_v1, &col_names)
378        })?)
379        .try_fold(dataframe, |df, expr| {
380            df.filter(expr).context(DataFusionSnafu)
381        })?;
382
383    // Apply the distinct if needed.
384    let dataframe = if !distincts.is_empty() {
385        dataframe
386            .distinct_on(distincts.clone(), distincts, None)
387            .context(DataFusionSnafu)?
388    } else {
389        dataframe
390    };
391
392    // Apply the sorts if needed.
393    let dataframe = if !sorts.is_empty() {
394        dataframe.sort(sorts).context(DataFusionSnafu)?
395    } else {
396        dataframe
397    };
398
399    // Apply the limit if needed.
400    let dataframe = if let Some(limit) = limit {
401        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
402    } else {
403        dataframe
404    };
405
406    // Execute the query and collect the result.
407    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
408
409    let output = Output::new_with_stream(Box::pin(
410        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
411    ));
412
413    Ok(output)
414}
415
416async fn get_table(
417    ctx: QueryContextRef,
418    catalog_manager: &CatalogManagerRef,
419    table_name: &str,
420) -> ServerResult<TableRef> {
421    catalog_manager
422        .table(
423            ctx.current_catalog(),
424            &ctx.current_schema(),
425            table_name,
426            Some(&ctx),
427        )
428        .await
429        .context(CatalogSnafu)?
430        .with_context(|| TableNotFoundSnafu {
431            table: table_name,
432            catalog: ctx.current_catalog(),
433            schema: ctx.current_schema(),
434        })
435}
436
437async fn find_traces_rank_3(
438    table: TableRef,
439    query_engine: &QueryEngineRef,
440    filters: Vec<Expr>,
441    sorts: Vec<SortExpr>,
442) -> ServerResult<Output> {
443    let df_context = create_df_context(query_engine)?;
444
445    let dataframe = df_context
446        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
447        .context(DataFusionSnafu)?;
448
449    let dataframe = dataframe
450        .select(vec![wildcard()])
451        .context(DataFusionSnafu)?;
452
453    // Apply all filters.
454    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
455        df.filter(expr).context(DataFusionSnafu)
456    })?;
457
458    // Apply the sorts if needed.
459    let dataframe = if !sorts.is_empty() {
460        dataframe.sort(sorts).context(DataFusionSnafu)?
461    } else {
462        dataframe
463    };
464
465    // create rank column, for each trace, get the earliest 3 spans
466    let trace_id_col = vec![col(TRACE_ID_COLUMN)];
467    let timestamp_asc = vec![col(TIMESTAMP_COLUMN).sort(true, false)];
468
469    let dataframe = dataframe
470        .with_column(
471            KEY_RN,
472            row_number()
473                .partition_by(trace_id_col)
474                .order_by(timestamp_asc)
475                .build()
476                .context(DataFusionSnafu)?,
477        )
478        .context(DataFusionSnafu)?;
479
480    let dataframe = dataframe
481        .filter(col(KEY_RN).lt_eq(lit(3)))
482        .context(DataFusionSnafu)?;
483
484    // Execute the query and collect the result.
485    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
486
487    let output = Output::new_with_stream(Box::pin(
488        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
489    ));
490
491    Ok(output)
492}
493
494// The current implementation registers UDFs during the planning stage, which makes it difficult
495// to utilize them through DataFrame APIs. To address this limitation, we create a new session
496// context and register the required UDFs, allowing them to be decoupled from the global context.
497// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
498fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
499    let df_context = SessionContext::new_with_state(
500        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
501    );
502
503    // The following JSON UDFs will be used for tags filters on v0 data model.
504    let udfs: Vec<FunctionRef> = vec![
505        Arc::new(JsonGetInt::default()),
506        Arc::new(JsonGetFloat::default()),
507        Arc::new(JsonGetBool::default()),
508        Arc::new(JsonGetString::default()),
509    ];
510
511    for udf in udfs {
512        df_context.register_udf(create_udf(udf));
513    }
514
515    Ok(df_context)
516}
517
518fn json_tag_filters(
519    dataframe: &DataFrame,
520    tags: HashMap<String, JsonValue>,
521) -> ServerResult<Vec<Expr>> {
522    let mut filters = vec![];
523
524    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
525    for (key, value) in tags.iter() {
526        if let JsonValue::String(value) = value {
527            filters.push(
528                dataframe
529                    .registry()
530                    .udf(JsonGetString::NAME)
531                    .context(DataFusionSnafu)?
532                    .call(vec![
533                        col(SPAN_ATTRIBUTES_COLUMN),
534                        lit(format!("[\"{}\"]", key)),
535                    ])
536                    .eq(lit(value)),
537            );
538        }
539        if let JsonValue::Number(value) = value {
540            if value.is_i64() {
541                filters.push(
542                    dataframe
543                        .registry()
544                        .udf(JsonGetInt::NAME)
545                        .context(DataFusionSnafu)?
546                        .call(vec![
547                            col(SPAN_ATTRIBUTES_COLUMN),
548                            lit(format!("[\"{}\"]", key)),
549                        ])
550                        .eq(lit(value.as_i64().unwrap())),
551                );
552            }
553            if value.is_f64() {
554                filters.push(
555                    dataframe
556                        .registry()
557                        .udf(JsonGetFloat::NAME)
558                        .context(DataFusionSnafu)?
559                        .call(vec![
560                            col(SPAN_ATTRIBUTES_COLUMN),
561                            lit(format!("[\"{}\"]", key)),
562                        ])
563                        .eq(lit(value.as_f64().unwrap())),
564                );
565            }
566        }
567        if let JsonValue::Bool(value) = value {
568            filters.push(
569                dataframe
570                    .registry()
571                    .udf(JsonGetBool::NAME)
572                    .context(DataFusionSnafu)?
573                    .call(vec![
574                        col(SPAN_ATTRIBUTES_COLUMN),
575                        lit(format!("[\"{}\"]", key)),
576                    ])
577                    .eq(lit(*value)),
578            );
579        }
580    }
581
582    Ok(filters)
583}
584
585/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
586/// If neither exists, logs a warning and returns None.
587#[inline]
588fn check_col_and_build_expr<F>(
589    span_key: String,
590    resource_key: String,
591    key: &str,
592    col_names: &HashSet<String>,
593    expr_builder: F,
594) -> Option<Expr>
595where
596    F: FnOnce(String) -> Expr,
597{
598    if col_names.contains(&span_key) {
599        return Some(expr_builder(span_key));
600    }
601    if col_names.contains(&resource_key) {
602        return Some(expr_builder(resource_key));
603    }
604    warn!("tag key {} not found in table columns", key);
605    None
606}
607
608fn flatten_tag_filters(
609    tags: HashMap<String, JsonValue>,
610    col_names: &HashSet<String>,
611) -> ServerResult<Vec<Expr>> {
612    let filters = tags
613        .into_iter()
614        .filter_map(|(key, value)| {
615            if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
616                return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
617            }
618
619            // TODO(shuiyisong): add more precise mapping from key to col name
620            let span_key = format!("\"span_attributes.{}\"", key);
621            let resource_key = format!("\"resource_attributes.{}\"", key);
622            match value {
623                JsonValue::String(value) => {
624                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
625                        col(k).eq(lit(value))
626                    })
627                }
628                JsonValue::Number(value) => {
629                    if value.is_f64() {
630                        // safe to unwrap as checked previously
631                        let value = value.as_f64().unwrap();
632                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
633                            col(k).eq(lit(value))
634                        })
635                    } else {
636                        let value = value.as_i64().unwrap();
637                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
638                            col(k).eq(lit(value))
639                        })
640                    }
641                }
642                JsonValue::Bool(value) => {
643                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
644                        col(k).eq(lit(value))
645                    })
646                }
647                JsonValue::Null => {
648                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
649                        col(k).is_null()
650                    })
651                }
652                // not supported at the moment
653                JsonValue::Array(_value) => None,
654                JsonValue::Object(_value) => None,
655            }
656        })
657        .collect();
658    Ok(filters)
659}
660
661fn tags_filters(
662    dataframe: &DataFrame,
663    tags: HashMap<String, JsonValue>,
664    is_data_model_v1: bool,
665    col_names: &HashSet<String>,
666) -> ServerResult<Vec<Expr>> {
667    if is_data_model_v1 {
668        flatten_tag_filters(tags, col_names)
669    } else {
670        json_tag_filters(dataframe, tags)
671    }
672}
673
674// Get trace ids from the output in recordbatches.
675async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
676    if let OutputData::Stream(stream) = output.data {
677        let schema = stream.schema().clone();
678        let recordbatches = util::collect(stream)
679            .await
680            .context(CollectRecordbatchSnafu)?;
681
682        // Only contains `trace_id` column in string type.
683        if !recordbatches.is_empty()
684            && schema.num_columns() == 1
685            && schema.contains_column(TRACE_ID_COLUMN)
686        {
687            let mut trace_ids = vec![];
688            for recordbatch in recordbatches {
689                recordbatch
690                    .iter_column_as_string(0)
691                    .flatten()
692                    .for_each(|x| trace_ids.push(x));
693            }
694
695            return Ok(trace_ids);
696        }
697    }
698
699    Ok(vec![])
700}