frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21    TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion::functions_window::expr_fn::row_number;
36use datafusion_expr::select_expr::SelectExpr;
37use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_nano, wildcard};
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42    TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
45use servers::otlp::trace::{
46    DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47    SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48    TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::TableRef;
54use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
55use table::table::adapter::DfTableProviderAdapter;
56
57use crate::instance::Instance;
58
59const DEFAULT_LIMIT: usize = 2000;
60const KEY_RN: &str = "greptime_rn";
61
62#[async_trait]
63impl JaegerQueryHandler for Instance {
64    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
65        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
66        Ok(query_trace_table(
67            ctx,
68            self.catalog_manager(),
69            self.query_engine(),
70            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
71            vec![],
72            vec![],
73            None,
74            None,
75            vec![col(SERVICE_NAME_COLUMN)],
76        )
77        .await?)
78    }
79
80    async fn get_operations(
81        &self,
82        ctx: QueryContextRef,
83        service_name: &str,
84        span_kind: Option<&str>,
85    ) -> ServerResult<Output> {
86        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
87
88        if let Some(span_kind) = span_kind {
89            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
90                "{}{}",
91                SPAN_KIND_PREFIX,
92                span_kind.to_uppercase()
93            ))));
94        }
95
96        // It's equivalent to the following SQL query:
97        //
98        // ```
99        // SELECT DISTINCT span_name, span_kind
100        // FROM
101        //   {db}.{trace_table}
102        // WHERE
103        //   service_name = '{service_name}' AND
104        //   span_kind = '{span_kind}'
105        // ORDER BY
106        //   span_name ASC
107        // ```.
108        Ok(query_trace_table(
109            ctx,
110            self.catalog_manager(),
111            self.query_engine(),
112            vec![
113                SelectExpr::from(col(SPAN_NAME_COLUMN)),
114                SelectExpr::from(col(SPAN_KIND_COLUMN)),
115                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
116                SelectExpr::from(col(TIMESTAMP_COLUMN)),
117            ],
118            filters,
119            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
120            Some(DEFAULT_LIMIT),
121            None,
122            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
123        )
124        .await?)
125    }
126
127    async fn get_trace(
128        &self,
129        ctx: QueryContextRef,
130        trace_id: &str,
131        start_time: Option<i64>,
132        end_time: Option<i64>,
133        limit: Option<usize>,
134    ) -> ServerResult<Output> {
135        // It's equivalent to the following SQL query:
136        //
137        // ```
138        // SELECT
139        //   *
140        // FROM
141        //   {db}.{trace_table}
142        // WHERE
143        //   trace_id = '{trace_id}' AND
144        //   timestamp >= {start_time} AND
145        //   timestamp <= {end_time}
146        // ORDER BY
147        //   timestamp DESC
148        // ```.
149        let selects = vec![wildcard()];
150
151        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
152
153        if let Some(start_time) = start_time {
154            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
155        }
156
157        if let Some(end_time) = end_time {
158            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
159        }
160
161        Ok(query_trace_table(
162            ctx,
163            self.catalog_manager(),
164            self.query_engine(),
165            selects,
166            filters,
167            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
168            limit,
169            None,
170            vec![],
171        )
172        .await?)
173    }
174
175    async fn find_traces(
176        &self,
177        ctx: QueryContextRef,
178        query_params: QueryTraceParams,
179    ) -> ServerResult<Output> {
180        let mut filters = vec![];
181
182        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
183        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
184
185        if let Some(operation_name) = query_params.operation_name {
186            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
187        }
188
189        if let Some(start_time) = query_params.start_time {
190            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
191        }
192
193        if let Some(end_time) = query_params.end_time {
194            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
195        }
196
197        if let Some(min_duration) = query_params.min_duration {
198            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
199        }
200
201        if let Some(max_duration) = query_params.max_duration {
202            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
203        }
204
205        // Get all distinct trace ids that match the filters.
206        // It's equivalent to the following SQL query:
207        //
208        // ```
209        // SELECT DISTINCT trace_id
210        // FROM
211        //   {db}.{trace_table}
212        // WHERE
213        //   service_name = '{service_name}' AND
214        //   operation_name = '{operation_name}' AND
215        //   timestamp >= {start_time} AND
216        //   timestamp <= {end_time} AND
217        //   duration >= {min_duration} AND
218        //   duration <= {max_duration}
219        // LIMIT {limit}
220        // ```.
221        let output = query_trace_table(
222            ctx.clone(),
223            self.catalog_manager(),
224            self.query_engine(),
225            vec![wildcard()],
226            filters,
227            vec![],
228            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
229            query_params.tags,
230            vec![col(TRACE_ID_COLUMN)],
231        )
232        .await?;
233
234        // Get all traces that match the trace ids from the previous query.
235        // It's equivalent to the following SQL query:
236        //
237        // ```
238        // SELECT *
239        // FROM
240        //   {db}.{trace_table}
241        // WHERE
242        //   trace_id IN ({trace_ids}) AND
243        //   timestamp >= {start_time} AND
244        //   timestamp <= {end_time}
245        // ```
246        let mut filters = vec![
247            col(TRACE_ID_COLUMN).in_list(
248                trace_ids_from_output(output)
249                    .await?
250                    .iter()
251                    .map(lit)
252                    .collect::<Vec<Expr>>(),
253                false,
254            ),
255        ];
256
257        if let Some(start_time) = query_params.start_time {
258            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
259        }
260
261        if let Some(end_time) = query_params.end_time {
262            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
263        }
264
265        match query_params.user_agent {
266            TraceUserAgent::Grafana => {
267                // grafana only use trace id and timestamp
268                // clicking the trace id will invoke the query trace api
269                // so we only need to return 1 span for each trace
270                let table_name = ctx
271                    .extension(JAEGER_QUERY_TABLE_NAME_KEY)
272                    .unwrap_or(TRACE_TABLE_NAME);
273
274                let table = get_table(ctx.clone(), self.catalog_manager(), table_name).await?;
275
276                Ok(find_traces_rank_3(
277                    table,
278                    self.query_engine(),
279                    filters,
280                    vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
281                )
282                .await?)
283            }
284            _ => {
285                // query all spans
286                Ok(query_trace_table(
287                    ctx,
288                    self.catalog_manager(),
289                    self.query_engine(),
290                    vec![wildcard()],
291                    filters,
292                    vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
293                    None,
294                    None,
295                    vec![],
296                )
297                .await?)
298            }
299        }
300    }
301}
302
303#[allow(clippy::too_many_arguments)]
304async fn query_trace_table(
305    ctx: QueryContextRef,
306    catalog_manager: &CatalogManagerRef,
307    query_engine: &QueryEngineRef,
308    selects: Vec<SelectExpr>,
309    filters: Vec<Expr>,
310    sorts: Vec<SortExpr>,
311    limit: Option<usize>,
312    tags: Option<HashMap<String, JsonValue>>,
313    distincts: Vec<Expr>,
314) -> ServerResult<Output> {
315    let trace_table_name = ctx
316        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
317        .unwrap_or(TRACE_TABLE_NAME);
318
319    // If only select services, use the trace services table.
320    // If querying operations (distinct by span_name and span_kind), use the trace operations table.
321    let table_name = {
322        if match selects.as_slice() {
323            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
324            _ => false,
325        } {
326            &trace_services_table_name(trace_table_name)
327        } else if !distincts.is_empty()
328            && distincts.contains(&col(SPAN_NAME_COLUMN))
329            && distincts.contains(&col(SPAN_KIND_COLUMN))
330        {
331            &trace_operations_table_name(trace_table_name)
332        } else {
333            trace_table_name
334        }
335    };
336
337    let table = catalog_manager
338        .table(
339            ctx.current_catalog(),
340            &ctx.current_schema(),
341            table_name,
342            Some(&ctx),
343        )
344        .await
345        .context(CatalogSnafu)?
346        .with_context(|| TableNotFoundSnafu {
347            table: table_name,
348            catalog: ctx.current_catalog(),
349            schema: ctx.current_schema(),
350        })?;
351
352    let is_data_model_v1 = table
353        .clone()
354        .table_info()
355        .meta
356        .options
357        .extra_options
358        .get(TABLE_DATA_MODEL)
359        .map(|s| s.as_str())
360        == Some(TABLE_DATA_MODEL_TRACE_V1);
361
362    // collect to set
363    let col_names = table
364        .table_info()
365        .meta
366        .field_column_names()
367        .map(|s| format!("\"{}\"", s))
368        .collect::<HashSet<String>>();
369
370    let df_context = create_df_context(query_engine)?;
371
372    let dataframe = df_context
373        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
374        .context(DataFusionSnafu)?;
375
376    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
377
378    // Apply all filters.
379    let dataframe = filters
380        .into_iter()
381        .chain(tags.map_or(Ok(vec![]), |t| {
382            tags_filters(&dataframe, t, is_data_model_v1, &col_names)
383        })?)
384        .try_fold(dataframe, |df, expr| {
385            df.filter(expr).context(DataFusionSnafu)
386        })?;
387
388    // Apply the distinct if needed.
389    let dataframe = if !distincts.is_empty() {
390        dataframe
391            .distinct_on(distincts.clone(), distincts, None)
392            .context(DataFusionSnafu)?
393    } else {
394        dataframe
395    };
396
397    // Apply the sorts if needed.
398    let dataframe = if !sorts.is_empty() {
399        dataframe.sort(sorts).context(DataFusionSnafu)?
400    } else {
401        dataframe
402    };
403
404    // Apply the limit if needed.
405    let dataframe = if let Some(limit) = limit {
406        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
407    } else {
408        dataframe
409    };
410
411    // Execute the query and collect the result.
412    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
413
414    let output = Output::new_with_stream(Box::pin(
415        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
416    ));
417
418    Ok(output)
419}
420
421async fn get_table(
422    ctx: QueryContextRef,
423    catalog_manager: &CatalogManagerRef,
424    table_name: &str,
425) -> ServerResult<TableRef> {
426    catalog_manager
427        .table(
428            ctx.current_catalog(),
429            &ctx.current_schema(),
430            table_name,
431            Some(&ctx),
432        )
433        .await
434        .context(CatalogSnafu)?
435        .with_context(|| TableNotFoundSnafu {
436            table: table_name,
437            catalog: ctx.current_catalog(),
438            schema: ctx.current_schema(),
439        })
440}
441
442async fn find_traces_rank_3(
443    table: TableRef,
444    query_engine: &QueryEngineRef,
445    filters: Vec<Expr>,
446    sorts: Vec<SortExpr>,
447) -> ServerResult<Output> {
448    let df_context = create_df_context(query_engine)?;
449
450    let dataframe = df_context
451        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
452        .context(DataFusionSnafu)?;
453
454    let dataframe = dataframe
455        .select(vec![wildcard()])
456        .context(DataFusionSnafu)?;
457
458    // Apply all filters.
459    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
460        df.filter(expr).context(DataFusionSnafu)
461    })?;
462
463    // Apply the sorts if needed.
464    let dataframe = if !sorts.is_empty() {
465        dataframe.sort(sorts).context(DataFusionSnafu)?
466    } else {
467        dataframe
468    };
469
470    // create rank column, for each trace, get the earliest 3 spans
471    let trace_id_col = vec![col(TRACE_ID_COLUMN)];
472    let timestamp_asc = vec![col(TIMESTAMP_COLUMN).sort(true, false)];
473
474    let dataframe = dataframe
475        .with_column(
476            KEY_RN,
477            row_number()
478                .partition_by(trace_id_col)
479                .order_by(timestamp_asc)
480                .build()
481                .context(DataFusionSnafu)?,
482        )
483        .context(DataFusionSnafu)?;
484
485    let dataframe = dataframe
486        .filter(col(KEY_RN).lt_eq(lit(3)))
487        .context(DataFusionSnafu)?;
488
489    // Execute the query and collect the result.
490    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
491
492    let output = Output::new_with_stream(Box::pin(
493        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
494    ));
495
496    Ok(output)
497}
498
499// The current implementation registers UDFs during the planning stage, which makes it difficult
500// to utilize them through DataFrame APIs. To address this limitation, we create a new session
501// context and register the required UDFs, allowing them to be decoupled from the global context.
502// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
503fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
504    let df_context = SessionContext::new_with_state(
505        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
506    );
507
508    // The following JSON UDFs will be used for tags filters on v0 data model.
509    let udfs: Vec<FunctionRef> = vec![
510        Arc::new(JsonGetInt::default()),
511        Arc::new(JsonGetFloat::default()),
512        Arc::new(JsonGetBool::default()),
513        Arc::new(JsonGetString::default()),
514    ];
515
516    for udf in udfs {
517        df_context.register_udf(create_udf(udf));
518    }
519
520    Ok(df_context)
521}
522
523fn json_tag_filters(
524    dataframe: &DataFrame,
525    tags: HashMap<String, JsonValue>,
526) -> ServerResult<Vec<Expr>> {
527    let mut filters = vec![];
528
529    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
530    for (key, value) in tags.iter() {
531        if let JsonValue::String(value) = value {
532            filters.push(
533                dataframe
534                    .registry()
535                    .udf(JsonGetString::NAME)
536                    .context(DataFusionSnafu)?
537                    .call(vec![
538                        col(SPAN_ATTRIBUTES_COLUMN),
539                        lit(format!("[\"{}\"]", key)),
540                    ])
541                    .eq(lit(value)),
542            );
543        }
544        if let JsonValue::Number(value) = value {
545            if value.is_i64() {
546                filters.push(
547                    dataframe
548                        .registry()
549                        .udf(JsonGetInt::NAME)
550                        .context(DataFusionSnafu)?
551                        .call(vec![
552                            col(SPAN_ATTRIBUTES_COLUMN),
553                            lit(format!("[\"{}\"]", key)),
554                        ])
555                        .eq(lit(value.as_i64().unwrap())),
556                );
557            }
558            if value.is_f64() {
559                filters.push(
560                    dataframe
561                        .registry()
562                        .udf(JsonGetFloat::NAME)
563                        .context(DataFusionSnafu)?
564                        .call(vec![
565                            col(SPAN_ATTRIBUTES_COLUMN),
566                            lit(format!("[\"{}\"]", key)),
567                        ])
568                        .eq(lit(value.as_f64().unwrap())),
569                );
570            }
571        }
572        if let JsonValue::Bool(value) = value {
573            filters.push(
574                dataframe
575                    .registry()
576                    .udf(JsonGetBool::NAME)
577                    .context(DataFusionSnafu)?
578                    .call(vec![
579                        col(SPAN_ATTRIBUTES_COLUMN),
580                        lit(format!("[\"{}\"]", key)),
581                    ])
582                    .eq(lit(*value)),
583            );
584        }
585    }
586
587    Ok(filters)
588}
589
590/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
591/// If neither exists, logs a warning and returns None.
592#[inline]
593fn check_col_and_build_expr<F>(
594    span_key: String,
595    resource_key: String,
596    key: &str,
597    col_names: &HashSet<String>,
598    expr_builder: F,
599) -> Option<Expr>
600where
601    F: FnOnce(String) -> Expr,
602{
603    if col_names.contains(&span_key) {
604        return Some(expr_builder(span_key));
605    }
606    if col_names.contains(&resource_key) {
607        return Some(expr_builder(resource_key));
608    }
609    warn!("tag key {} not found in table columns", key);
610    None
611}
612
613fn flatten_tag_filters(
614    tags: HashMap<String, JsonValue>,
615    col_names: &HashSet<String>,
616) -> ServerResult<Vec<Expr>> {
617    let filters = tags
618        .into_iter()
619        .filter_map(|(key, value)| {
620            if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
621                return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
622            }
623
624            // TODO(shuiyisong): add more precise mapping from key to col name
625            let span_key = format!("\"span_attributes.{}\"", key);
626            let resource_key = format!("\"resource_attributes.{}\"", key);
627            match value {
628                JsonValue::String(value) => {
629                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
630                        col(k).eq(lit(value))
631                    })
632                }
633                JsonValue::Number(value) => {
634                    if value.is_f64() {
635                        // safe to unwrap as checked previously
636                        let value = value.as_f64().unwrap();
637                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
638                            col(k).eq(lit(value))
639                        })
640                    } else {
641                        let value = value.as_i64().unwrap();
642                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
643                            col(k).eq(lit(value))
644                        })
645                    }
646                }
647                JsonValue::Bool(value) => {
648                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
649                        col(k).eq(lit(value))
650                    })
651                }
652                JsonValue::Null => {
653                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
654                        col(k).is_null()
655                    })
656                }
657                // not supported at the moment
658                JsonValue::Array(_value) => None,
659                JsonValue::Object(_value) => None,
660            }
661        })
662        .collect();
663    Ok(filters)
664}
665
666fn tags_filters(
667    dataframe: &DataFrame,
668    tags: HashMap<String, JsonValue>,
669    is_data_model_v1: bool,
670    col_names: &HashSet<String>,
671) -> ServerResult<Vec<Expr>> {
672    if is_data_model_v1 {
673        flatten_tag_filters(tags, col_names)
674    } else {
675        json_tag_filters(dataframe, tags)
676    }
677}
678
679// Get trace ids from the output in recordbatches.
680async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
681    if let OutputData::Stream(stream) = output.data {
682        let schema = stream.schema().clone();
683        let recordbatches = util::collect(stream)
684            .await
685            .context(CollectRecordbatchSnafu)?;
686
687        // Only contains `trace_id` column in string type.
688        if !recordbatches.is_empty()
689            && schema.num_columns() == 1
690            && schema.contains_column(TRACE_ID_COLUMN)
691        {
692            let mut trace_ids = vec![];
693            for recordbatch in recordbatches {
694                recordbatch
695                    .iter_column_as_string(0)
696                    .flatten()
697                    .for_each(|x| trace_ids.push(x));
698            }
699
700            return Ok(trace_ids);
701        }
702    }
703
704    Ok(vec![])
705}