frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21    TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use common_telemetry::warn;
32use datafusion::dataframe::DataFrame;
33use datafusion::execution::SessionStateBuilder;
34use datafusion::execution::context::SessionContext;
35use datafusion_expr::select_expr::SelectExpr;
36use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
37use datatypes::value::ValueRef;
38use query::QueryEngineRef;
39use serde_json::Value as JsonValue;
40use servers::error::{
41    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
42    TableNotFoundSnafu,
43};
44use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
45use servers::otlp::trace::{
46    DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
47    SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
48    TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
49};
50use servers::query_handler::JaegerQueryHandler;
51use session::context::QueryContextRef;
52use snafu::{OptionExt, ResultExt};
53use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
54use table::table::adapter::DfTableProviderAdapter;
55
56use crate::instance::Instance;
57
58const DEFAULT_LIMIT: usize = 2000;
59
60#[async_trait]
61impl JaegerQueryHandler for Instance {
62    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
63        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
64        Ok(query_trace_table(
65            ctx,
66            self.catalog_manager(),
67            self.query_engine(),
68            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
69            vec![],
70            vec![],
71            None,
72            None,
73            vec![col(SERVICE_NAME_COLUMN)],
74        )
75        .await?)
76    }
77
78    async fn get_operations(
79        &self,
80        ctx: QueryContextRef,
81        service_name: &str,
82        span_kind: Option<&str>,
83    ) -> ServerResult<Output> {
84        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
85
86        if let Some(span_kind) = span_kind {
87            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
88                "{}{}",
89                SPAN_KIND_PREFIX,
90                span_kind.to_uppercase()
91            ))));
92        }
93
94        // It's equivalent to the following SQL query:
95        //
96        // ```
97        // SELECT DISTINCT span_name, span_kind
98        // FROM
99        //   {db}.{trace_table}
100        // WHERE
101        //   service_name = '{service_name}' AND
102        //   span_kind = '{span_kind}'
103        // ORDER BY
104        //   span_name ASC
105        // ```.
106        Ok(query_trace_table(
107            ctx,
108            self.catalog_manager(),
109            self.query_engine(),
110            vec![
111                SelectExpr::from(col(SPAN_NAME_COLUMN)),
112                SelectExpr::from(col(SPAN_KIND_COLUMN)),
113                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
114                SelectExpr::from(col(TIMESTAMP_COLUMN)),
115            ],
116            filters,
117            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
118            Some(DEFAULT_LIMIT),
119            None,
120            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
121        )
122        .await?)
123    }
124
125    async fn get_trace(
126        &self,
127        ctx: QueryContextRef,
128        trace_id: &str,
129        start_time: Option<i64>,
130        end_time: Option<i64>,
131        limit: Option<usize>,
132    ) -> ServerResult<Output> {
133        // It's equivalent to the following SQL query:
134        //
135        // ```
136        // SELECT
137        //   *
138        // FROM
139        //   {db}.{trace_table}
140        // WHERE
141        //   trace_id = '{trace_id}' AND
142        //   timestamp >= {start_time} AND
143        //   timestamp <= {end_time}
144        // ORDER BY
145        //   timestamp DESC
146        // ```.
147        let selects = vec![wildcard()];
148
149        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
150
151        if let Some(start_time) = start_time {
152            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
153        }
154
155        if let Some(end_time) = end_time {
156            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
157        }
158
159        let limit = if start_time.is_some() && end_time.is_some() {
160            // allow unlimited limit if time range is specified
161            limit
162        } else {
163            limit.or(Some(DEFAULT_LIMIT))
164        };
165
166        Ok(query_trace_table(
167            ctx,
168            self.catalog_manager(),
169            self.query_engine(),
170            selects,
171            filters,
172            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
173            limit,
174            None,
175            vec![],
176        )
177        .await?)
178    }
179
180    async fn find_traces(
181        &self,
182        ctx: QueryContextRef,
183        query_params: QueryTraceParams,
184    ) -> ServerResult<Output> {
185        let mut filters = vec![];
186
187        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
188        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
189
190        if let Some(operation_name) = query_params.operation_name {
191            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
192        }
193
194        if let Some(start_time) = query_params.start_time {
195            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
196        }
197
198        if let Some(end_time) = query_params.end_time {
199            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
200        }
201
202        if let Some(min_duration) = query_params.min_duration {
203            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
204        }
205
206        if let Some(max_duration) = query_params.max_duration {
207            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
208        }
209
210        // Get all distinct trace ids that match the filters.
211        // It's equivalent to the following SQL query:
212        //
213        // ```
214        // SELECT DISTINCT trace_id
215        // FROM
216        //   {db}.{trace_table}
217        // WHERE
218        //   service_name = '{service_name}' AND
219        //   operation_name = '{operation_name}' AND
220        //   timestamp >= {start_time} AND
221        //   timestamp <= {end_time} AND
222        //   duration >= {min_duration} AND
223        //   duration <= {max_duration}
224        // LIMIT {limit}
225        // ```.
226        let output = query_trace_table(
227            ctx.clone(),
228            self.catalog_manager(),
229            self.query_engine(),
230            vec![wildcard()],
231            filters,
232            vec![],
233            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
234            query_params.tags,
235            vec![col(TRACE_ID_COLUMN)],
236        )
237        .await?;
238
239        // Get all traces that match the trace ids from the previous query.
240        // It's equivalent to the following SQL query:
241        //
242        // ```
243        // SELECT *
244        // FROM
245        //   {db}.{trace_table}
246        // WHERE
247        //   trace_id IN ({trace_ids}) AND
248        //   timestamp >= {start_time} AND
249        //   timestamp <= {end_time}
250        // ```
251        let mut filters = vec![
252            col(TRACE_ID_COLUMN).in_list(
253                trace_ids_from_output(output)
254                    .await?
255                    .iter()
256                    .map(lit)
257                    .collect::<Vec<Expr>>(),
258                false,
259            ),
260        ];
261
262        if let Some(start_time) = query_params.start_time {
263            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
264        }
265
266        if let Some(end_time) = query_params.end_time {
267            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
268        }
269
270        Ok(query_trace_table(
271            ctx,
272            self.catalog_manager(),
273            self.query_engine(),
274            vec![wildcard()],
275            filters,
276            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
277            None,
278            None,
279            vec![],
280        )
281        .await?)
282    }
283}
284
285#[allow(clippy::too_many_arguments)]
286async fn query_trace_table(
287    ctx: QueryContextRef,
288    catalog_manager: &CatalogManagerRef,
289    query_engine: &QueryEngineRef,
290    selects: Vec<SelectExpr>,
291    filters: Vec<Expr>,
292    sorts: Vec<SortExpr>,
293    limit: Option<usize>,
294    tags: Option<HashMap<String, JsonValue>>,
295    distincts: Vec<Expr>,
296) -> ServerResult<Output> {
297    let trace_table_name = ctx
298        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
299        .unwrap_or(TRACE_TABLE_NAME);
300
301    // If only select services, use the trace services table.
302    // If querying operations (distinct by span_name and span_kind), use the trace operations table.
303    let table_name = {
304        if match selects.as_slice() {
305            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
306            _ => false,
307        } {
308            &trace_services_table_name(trace_table_name)
309        } else if !distincts.is_empty()
310            && distincts.contains(&col(SPAN_NAME_COLUMN))
311            && distincts.contains(&col(SPAN_KIND_COLUMN))
312        {
313            &trace_operations_table_name(trace_table_name)
314        } else {
315            trace_table_name
316        }
317    };
318
319    let table = catalog_manager
320        .table(
321            ctx.current_catalog(),
322            &ctx.current_schema(),
323            table_name,
324            Some(&ctx),
325        )
326        .await
327        .context(CatalogSnafu)?
328        .with_context(|| TableNotFoundSnafu {
329            table: table_name,
330            catalog: ctx.current_catalog(),
331            schema: ctx.current_schema(),
332        })?;
333
334    let is_data_model_v1 = table
335        .clone()
336        .table_info()
337        .meta
338        .options
339        .extra_options
340        .get(TABLE_DATA_MODEL)
341        .map(|s| s.as_str())
342        == Some(TABLE_DATA_MODEL_TRACE_V1);
343
344    // collect to set
345    let col_names = table
346        .table_info()
347        .meta
348        .field_column_names()
349        .map(|s| format!("\"{}\"", s))
350        .collect::<HashSet<String>>();
351
352    let df_context = create_df_context(query_engine)?;
353
354    let dataframe = df_context
355        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
356        .context(DataFusionSnafu)?;
357
358    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
359
360    // Apply all filters.
361    let dataframe = filters
362        .into_iter()
363        .chain(tags.map_or(Ok(vec![]), |t| {
364            tags_filters(&dataframe, t, is_data_model_v1, &col_names)
365        })?)
366        .try_fold(dataframe, |df, expr| {
367            df.filter(expr).context(DataFusionSnafu)
368        })?;
369
370    // Apply the distinct if needed.
371    let dataframe = if !distincts.is_empty() {
372        dataframe
373            .distinct_on(distincts.clone(), distincts, None)
374            .context(DataFusionSnafu)?
375    } else {
376        dataframe
377    };
378
379    // Apply the sorts if needed.
380    let dataframe = if !sorts.is_empty() {
381        dataframe.sort(sorts).context(DataFusionSnafu)?
382    } else {
383        dataframe
384    };
385
386    // Apply the limit if needed.
387    let dataframe = if let Some(limit) = limit {
388        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
389    } else {
390        dataframe
391    };
392
393    // Execute the query and collect the result.
394    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
395
396    let output = Output::new_with_stream(Box::pin(
397        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
398    ));
399
400    Ok(output)
401}
402
403// The current implementation registers UDFs during the planning stage, which makes it difficult
404// to utilize them through DataFrame APIs. To address this limitation, we create a new session
405// context and register the required UDFs, allowing them to be decoupled from the global context.
406// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
407fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
408    let df_context = SessionContext::new_with_state(
409        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
410    );
411
412    // The following JSON UDFs will be used for tags filters on v0 data model.
413    let udfs: Vec<FunctionRef> = vec![
414        Arc::new(JsonGetInt::default()),
415        Arc::new(JsonGetFloat::default()),
416        Arc::new(JsonGetBool::default()),
417        Arc::new(JsonGetString::default()),
418    ];
419
420    for udf in udfs {
421        df_context.register_udf(create_udf(udf));
422    }
423
424    Ok(df_context)
425}
426
427fn json_tag_filters(
428    dataframe: &DataFrame,
429    tags: HashMap<String, JsonValue>,
430) -> ServerResult<Vec<Expr>> {
431    let mut filters = vec![];
432
433    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
434    for (key, value) in tags.iter() {
435        if let JsonValue::String(value) = value {
436            filters.push(
437                dataframe
438                    .registry()
439                    .udf(JsonGetString::NAME)
440                    .context(DataFusionSnafu)?
441                    .call(vec![
442                        col(SPAN_ATTRIBUTES_COLUMN),
443                        lit(format!("[\"{}\"]", key)),
444                    ])
445                    .eq(lit(value)),
446            );
447        }
448        if let JsonValue::Number(value) = value {
449            if value.is_i64() {
450                filters.push(
451                    dataframe
452                        .registry()
453                        .udf(JsonGetInt::NAME)
454                        .context(DataFusionSnafu)?
455                        .call(vec![
456                            col(SPAN_ATTRIBUTES_COLUMN),
457                            lit(format!("[\"{}\"]", key)),
458                        ])
459                        .eq(lit(value.as_i64().unwrap())),
460                );
461            }
462            if value.is_f64() {
463                filters.push(
464                    dataframe
465                        .registry()
466                        .udf(JsonGetFloat::NAME)
467                        .context(DataFusionSnafu)?
468                        .call(vec![
469                            col(SPAN_ATTRIBUTES_COLUMN),
470                            lit(format!("[\"{}\"]", key)),
471                        ])
472                        .eq(lit(value.as_f64().unwrap())),
473                );
474            }
475        }
476        if let JsonValue::Bool(value) = value {
477            filters.push(
478                dataframe
479                    .registry()
480                    .udf(JsonGetBool::NAME)
481                    .context(DataFusionSnafu)?
482                    .call(vec![
483                        col(SPAN_ATTRIBUTES_COLUMN),
484                        lit(format!("[\"{}\"]", key)),
485                    ])
486                    .eq(lit(*value)),
487            );
488        }
489    }
490
491    Ok(filters)
492}
493
494/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
495/// If neither exists, logs a warning and returns None.
496#[inline]
497fn check_col_and_build_expr<F>(
498    span_key: String,
499    resource_key: String,
500    key: &str,
501    col_names: &HashSet<String>,
502    expr_builder: F,
503) -> Option<Expr>
504where
505    F: FnOnce(String) -> Expr,
506{
507    if col_names.contains(&span_key) {
508        return Some(expr_builder(span_key));
509    }
510    if col_names.contains(&resource_key) {
511        return Some(expr_builder(resource_key));
512    }
513    warn!("tag key {} not found in table columns", key);
514    None
515}
516
517fn flatten_tag_filters(
518    tags: HashMap<String, JsonValue>,
519    col_names: &HashSet<String>,
520) -> ServerResult<Vec<Expr>> {
521    let filters = tags
522        .into_iter()
523        .filter_map(|(key, value)| {
524            if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
525                return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
526            }
527
528            // TODO(shuiyisong): add more precise mapping from key to col name
529            let span_key = format!("\"span_attributes.{}\"", key);
530            let resource_key = format!("\"resource_attributes.{}\"", key);
531            match value {
532                JsonValue::String(value) => {
533                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
534                        col(k).eq(lit(value))
535                    })
536                }
537                JsonValue::Number(value) => {
538                    if value.is_f64() {
539                        // safe to unwrap as checked previously
540                        let value = value.as_f64().unwrap();
541                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
542                            col(k).eq(lit(value))
543                        })
544                    } else {
545                        let value = value.as_i64().unwrap();
546                        check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
547                            col(k).eq(lit(value))
548                        })
549                    }
550                }
551                JsonValue::Bool(value) => {
552                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
553                        col(k).eq(lit(value))
554                    })
555                }
556                JsonValue::Null => {
557                    check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
558                        col(k).is_null()
559                    })
560                }
561                // not supported at the moment
562                JsonValue::Array(_value) => None,
563                JsonValue::Object(_value) => None,
564            }
565        })
566        .collect();
567    Ok(filters)
568}
569
570fn tags_filters(
571    dataframe: &DataFrame,
572    tags: HashMap<String, JsonValue>,
573    is_data_model_v1: bool,
574    col_names: &HashSet<String>,
575) -> ServerResult<Vec<Expr>> {
576    if is_data_model_v1 {
577        flatten_tag_filters(tags, col_names)
578    } else {
579        json_tag_filters(dataframe, tags)
580    }
581}
582
583// Get trace ids from the output in recordbatches.
584async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
585    if let OutputData::Stream(stream) = output.data {
586        let schema = stream.schema().clone();
587        let recordbatches = util::collect(stream)
588            .await
589            .context(CollectRecordbatchSnafu)?;
590
591        // Only contains `trace_id` column in string type.
592        if !recordbatches.is_empty()
593            && schema.num_columns() == 1
594            && schema.contains_column(TRACE_ID_COLUMN)
595        {
596            let mut trace_ids = vec![];
597            for recordbatch in recordbatches {
598                for col in recordbatch.columns().iter() {
599                    for row_idx in 0..recordbatch.num_rows() {
600                        if let ValueRef::String(value) = col.get_ref(row_idx) {
601                            trace_ids.push(value.to_string());
602                        }
603                    }
604                }
605            }
606
607            return Ok(trace_ids);
608        }
609    }
610
611    Ok(vec![])
612}