frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21    TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion_expr::select_expr::SelectExpr;
36use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
37use datatypes::value::ValueRef;
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42    TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
45use servers::otlp::trace::{
46    DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47    SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48    TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::instance::Instance;
57
58const DEFAULT_LIMIT: usize = 2000;
59
60#[async_trait]
61impl JaegerQueryHandler for Instance {
62    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
63        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
64        Ok(query_trace_table(
65            ctx,
66            self.catalog_manager(),
67            self.query_engine(),
68            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
69            vec![],
70            vec![],
71            None,
72            None,
73            vec![col(SERVICE_NAME_COLUMN)],
74        )
75        .await?)
76    }
77
78    async fn get_operations(
79        &self,
80        ctx: QueryContextRef,
81        service_name: &str,
82        span_kind: Option<&str>,
83    ) -> ServerResult<Output> {
84        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
85
86        if let Some(span_kind) = span_kind {
87            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
88                "{}{}",
89                SPAN_KIND_PREFIX,
90                span_kind.to_uppercase()
91            ))));
92        }
93
94        // It's equivalent to the following SQL query:
95        //
96        // ```
97        // SELECT DISTINCT span_name, span_kind
98        // FROM
99        //   {db}.{trace_table}
100        // WHERE
101        //   service_name = '{service_name}' AND
102        //   span_kind = '{span_kind}'
103        // ORDER BY
104        //   span_name ASC
105        // ```.
106        Ok(query_trace_table(
107            ctx,
108            self.catalog_manager(),
109            self.query_engine(),
110            vec![
111                SelectExpr::from(col(SPAN_NAME_COLUMN)),
112                SelectExpr::from(col(SPAN_KIND_COLUMN)),
113                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
114                SelectExpr::from(col(TIMESTAMP_COLUMN)),
115            ],
116            filters,
117            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
118            Some(DEFAULT_LIMIT),
119            None,
120            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
121        )
122        .await?)
123    }
124
125    async fn get_trace(
126        &self,
127        ctx: QueryContextRef,
128        trace_id: &str,
129        start_time: Option<i64>,
130        end_time: Option<i64>,
131    ) -> ServerResult<Output> {
132        // It's equivalent to the following SQL query:
133        //
134        // ```
135        // SELECT
136        //   *
137        // FROM
138        //   {db}.{trace_table}
139        // WHERE
140        //   trace_id = '{trace_id}' AND
141        //   timestamp >= {start_time} AND
142        //   timestamp <= {end_time}
143        // ORDER BY
144        //   timestamp DESC
145        // ```.
146        let selects = vec![wildcard()];
147
148        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
149
150        if let Some(start_time) = start_time {
151            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
152        }
153
154        if let Some(end_time) = end_time {
155            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
156        }
157
158        Ok(query_trace_table(
159            ctx,
160            self.catalog_manager(),
161            self.query_engine(),
162            selects,
163            filters,
164            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
165            Some(DEFAULT_LIMIT),
166            None,
167            vec![],
168        )
169        .await?)
170    }
171
172    async fn find_traces(
173        &self,
174        ctx: QueryContextRef,
175        query_params: QueryTraceParams,
176    ) -> ServerResult<Output> {
177        let mut filters = vec![];
178
179        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
180        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
181
182        if let Some(operation_name) = query_params.operation_name {
183            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
184        }
185
186        if let Some(start_time) = query_params.start_time {
187            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
188        }
189
190        if let Some(end_time) = query_params.end_time {
191            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
192        }
193
194        if let Some(min_duration) = query_params.min_duration {
195            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
196        }
197
198        if let Some(max_duration) = query_params.max_duration {
199            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
200        }
201
202        // Get all distinct trace ids that match the filters.
203        // It's equivalent to the following SQL query:
204        //
205        // ```
206        // SELECT DISTINCT trace_id
207        // FROM
208        //   {db}.{trace_table}
209        // WHERE
210        //   service_name = '{service_name}' AND
211        //   operation_name = '{operation_name}' AND
212        //   timestamp >= {start_time} AND
213        //   timestamp <= {end_time} AND
214        //   duration >= {min_duration} AND
215        //   duration <= {max_duration}
216        // LIMIT {limit}
217        // ```.
218        let output = query_trace_table(
219            ctx.clone(),
220            self.catalog_manager(),
221            self.query_engine(),
222            vec![wildcard()],
223            filters,
224            vec![],
225            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
226            query_params.tags,
227            vec![col(TRACE_ID_COLUMN)],
228        )
229        .await?;
230
231        // Get all traces that match the trace ids from the previous query.
232        // It's equivalent to the following SQL query:
233        //
234        // ```
235        // SELECT *
236        // FROM
237        //   {db}.{trace_table}
238        // WHERE
239        //   trace_id IN ({trace_ids}) AND
240        //   timestamp >= {start_time} AND
241        //   timestamp <= {end_time}
242        // ```
243        let mut filters = vec![
244            col(TRACE_ID_COLUMN).in_list(
245                trace_ids_from_output(output)
246                    .await?
247                    .iter()
248                    .map(lit)
249                    .collect::<Vec<Expr>>(),
250                false,
251            ),
252        ];
253
254        if let Some(start_time) = query_params.start_time {
255            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
256        }
257
258        if let Some(end_time) = query_params.end_time {
259            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
260        }
261
262        Ok(query_trace_table(
263            ctx,
264            self.catalog_manager(),
265            self.query_engine(),
266            vec![wildcard()],
267            filters,
268            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
269            None,
270            None,
271            vec![],
272        )
273        .await?)
274    }
275}
276
277#[allow(clippy::too_many_arguments)]
278async fn query_trace_table(
279    ctx: QueryContextRef,
280    catalog_manager: &CatalogManagerRef,
281    query_engine: &QueryEngineRef,
282    selects: Vec<SelectExpr>,
283    filters: Vec<Expr>,
284    sorts: Vec<SortExpr>,
285    limit: Option<usize>,
286    tags: Option<HashMap<String, JsonValue>>,
287    distincts: Vec<Expr>,
288) -> ServerResult<Output> {
289    let trace_table_name = ctx
290        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
291        .unwrap_or(TRACE_TABLE_NAME);
292
293    // If only select services, use the trace services table.
294    // If querying operations (distinct by span_name and span_kind), use the trace operations table.
295    let table_name = {
296        if match selects.as_slice() {
297            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
298            _ => false,
299        } {
300            &trace_services_table_name(trace_table_name)
301        } else if !distincts.is_empty()
302            && distincts.contains(&col(SPAN_NAME_COLUMN))
303            && distincts.contains(&col(SPAN_KIND_COLUMN))
304        {
305            &trace_operations_table_name(trace_table_name)
306        } else {
307            trace_table_name
308        }
309    };
310
311    let table = catalog_manager
312        .table(
313            ctx.current_catalog(),
314            &ctx.current_schema(),
315            table_name,
316            Some(&ctx),
317        )
318        .await
319        .context(CatalogSnafu)?
320        .with_context(|| TableNotFoundSnafu {
321            table: table_name,
322            catalog: ctx.current_catalog(),
323            schema: ctx.current_schema(),
324        })?;
325
326    let is_data_model_v1 = table
327        .clone()
328        .table_info()
329        .meta
330        .options
331        .extra_options
332        .get(TABLE_DATA_MODEL)
333        .map(|s| s.as_str())
334        == Some(TABLE_DATA_MODEL_TRACE_V1);
335
336    // collect to set
337    let col_names = table
338        .table_info()
339        .meta
340        .field_column_names()
341        .map(|s| format!("\"{}\"", s))
342        .collect::<HashSet<String>>();
343
344    let df_context = create_df_context(query_engine)?;
345
346    let dataframe = df_context
347        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
348        .context(DataFusionSnafu)?;
349
350    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
351
352    // Apply all filters.
353    let dataframe = filters
354        .into_iter()
355        .chain(tags.map_or(Ok(vec![]), |t| {
356            tags_filters(&dataframe, t, is_data_model_v1, &col_names)
357        })?)
358        .try_fold(dataframe, |df, expr| {
359            df.filter(expr).context(DataFusionSnafu)
360        })?;
361
362    // Apply the distinct if needed.
363    let dataframe = if !distincts.is_empty() {
364        dataframe
365            .distinct_on(distincts.clone(), distincts, None)
366            .context(DataFusionSnafu)?
367    } else {
368        dataframe
369    };
370
371    // Apply the sorts if needed.
372    let dataframe = if !sorts.is_empty() {
373        dataframe.sort(sorts).context(DataFusionSnafu)?
374    } else {
375        dataframe
376    };
377
378    // Apply the limit if needed.
379    let dataframe = if let Some(limit) = limit {
380        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
381    } else {
382        dataframe
383    };
384
385    // Execute the query and collect the result.
386    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
387
388    let output = Output::new_with_stream(Box::pin(
389        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
390    ));
391
392    Ok(output)
393}
394
395// The current implementation registers UDFs during the planning stage, which makes it difficult
396// to utilize them through DataFrame APIs. To address this limitation, we create a new session
397// context and register the required UDFs, allowing them to be decoupled from the global context.
398// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
399fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
400    let df_context = SessionContext::new_with_state(
401        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
402    );
403
404    // The following JSON UDFs will be used for tags filters on v0 data model.
405    let udfs: Vec<FunctionRef> = vec![
406        Arc::new(JsonGetInt::default()),
407        Arc::new(JsonGetFloat::default()),
408        Arc::new(JsonGetBool::default()),
409        Arc::new(JsonGetString::default()),
410    ];
411
412    for udf in udfs {
413        df_context.register_udf(create_udf(udf));
414    }
415
416    Ok(df_context)
417}
418
419fn json_tag_filters(
420    dataframe: &DataFrame,
421    tags: HashMap<String, JsonValue>,
422) -> ServerResult<Vec<Expr>> {
423    let mut filters = vec![];
424
425    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
426    for (key, value) in tags.iter() {
427        if let JsonValue::String(value) = value {
428            filters.push(
429                dataframe
430                    .registry()
431                    .udf(JsonGetString::NAME)
432                    .context(DataFusionSnafu)?
433                    .call(vec![
434                        col(SPAN_ATTRIBUTES_COLUMN),
435                        lit(format!("[\"{}\"]", key)),
436                    ])
437                    .eq(lit(value)),
438            );
439        }
440        if let JsonValue::Number(value) = value {
441            if value.is_i64() {
442                filters.push(
443                    dataframe
444                        .registry()
445                        .udf(JsonGetInt::NAME)
446                        .context(DataFusionSnafu)?
447                        .call(vec![
448                            col(SPAN_ATTRIBUTES_COLUMN),
449                            lit(format!("[\"{}\"]", key)),
450                        ])
451                        .eq(lit(value.as_i64().unwrap())),
452                );
453            }
454            if value.is_f64() {
455                filters.push(
456                    dataframe
457                        .registry()
458                        .udf(JsonGetFloat::NAME)
459                        .context(DataFusionSnafu)?
460                        .call(vec![
461                            col(SPAN_ATTRIBUTES_COLUMN),
462                            lit(format!("[\"{}\"]", key)),
463                        ])
464                        .eq(lit(value.as_f64().unwrap())),
465                );
466            }
467        }
468        if let JsonValue::Bool(value) = value {
469            filters.push(
470                dataframe
471                    .registry()
472                    .udf(JsonGetBool::NAME)
473                    .context(DataFusionSnafu)?
474                    .call(vec![
475                        col(SPAN_ATTRIBUTES_COLUMN),
476                        lit(format!("[\"{}\"]", key)),
477                    ])
478                    .eq(lit(*value)),
479            );
480        }
481    }
482
483    Ok(filters)
484}
485
486/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
487/// If neither exists, logs a warning and returns None.
488#[inline]
489fn check_col_and_build_expr<F>(
490    span_key: String,
491    resource_key: String,
492    key: &str,
493    col_names: &HashSet<String>,
494    expr_builder: F,
495) -> Option<Expr>
496where
497    F: FnOnce(String) -> Expr,
498{
499    if col_names.contains(&span_key) {
500        return Some(expr_builder(span_key));
501    }
502    if col_names.contains(&resource_key) {
503        return Some(expr_builder(resource_key));
504    }
505    warn!("tag key {} not found in table columns", key);
506    None
507}
508
509fn flatten_tag_filters(
510    tags: HashMap<String, JsonValue>,
511    col_names: &HashSet<String>,
512) -> ServerResult<Vec<Expr>> {
513    let filters = tags
514        .into_iter()
515        .filter_map(|(key, value)| {
516            if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
517                return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
518            }
519
520            // TODO(shuiyisong): add more precise mapping from key to col name
521            let span_key = format!("\"span_attributes.{}\"", key);
522            let resource_key = format!("\"resource_attributes.{}\"", key);
523            match value {
524                JsonValue::String(value) => {
525                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
526                        col(k).eq(lit(value))
527                    })
528                }
529                JsonValue::Number(value) => {
530                    if value.is_f64() {
531                        // safe to unwrap as checked previously
532                        let value = value.as_f64().unwrap();
533                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
534                            col(k).eq(lit(value))
535                        })
536                    } else {
537                        let value = value.as_i64().unwrap();
538                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
539                            col(k).eq(lit(value))
540                        })
541                    }
542                }
543                JsonValue::Bool(value) => {
544                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
545                        col(k).eq(lit(value))
546                    })
547                }
548                JsonValue::Null => {
549                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
550                        col(k).is_null()
551                    })
552                }
553                // not supported at the moment
554                JsonValue::Array(_value) => None,
555                JsonValue::Object(_value) => None,
556            }
557        })
558        .collect();
559    Ok(filters)
560}
561
562fn tags_filters(
563    dataframe: &DataFrame,
564    tags: HashMap<String, JsonValue>,
565    is_data_model_v1: bool,
566    col_names: &HashSet<String>,
567) -> ServerResult<Vec<Expr>> {
568    if is_data_model_v1 {
569        flatten_tag_filters(tags, col_names)
570    } else {
571        json_tag_filters(dataframe, tags)
572    }
573}
574
575// Get trace ids from the output in recordbatches.
576async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
577    if let OutputData::Stream(stream) = output.data {
578        let schema = stream.schema().clone();
579        let recordbatches = util::collect(stream)
580            .await
581            .context(CollectRecordbatchSnafu)?;
582
583        // Only contains `trace_id` column in string type.
584        if !recordbatches.is_empty()
585            && schema.num_columns() == 1
586            && schema.contains_column(TRACE_ID_COLUMN)
587        {
588            let mut trace_ids = vec![];
589            for recordbatch in recordbatches {
590                for col in recordbatch.columns().iter() {
591                    for row_idx in 0..recordbatch.num_rows() {
592                        if let ValueRef::String(value) = col.get_ref(row_idx) {
593                            trace_ids.push(value.to_string());
594                        }
595                    }
596                }
597            }
598
599            return Ok(trace_ids);
600        }
601    }
602
603    Ok(vec![])
604}