frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{TRACE_TABLE_NAME, trace_services_table_name};
21use common_function::function::FunctionRef;
22use common_function::scalars::json::json_get::{
23    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_query::{Output, OutputData};
27use common_recordbatch::adapter::RecordBatchStreamAdapter;
28use common_recordbatch::util;
29use datafusion::dataframe::DataFrame;
30use datafusion::execution::SessionStateBuilder;
31use datafusion::execution::context::SessionContext;
32use datafusion_expr::select_expr::SelectExpr;
33use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
34use datatypes::value::ValueRef;
35use query::QueryEngineRef;
36use serde_json::Value as JsonValue;
37use servers::error::{
38    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
39    TableNotFoundSnafu,
40};
41use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
42use servers::otlp::trace::{
43    DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
44    SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
45};
46use servers::query_handler::JaegerQueryHandler;
47use session::context::QueryContextRef;
48use snafu::{OptionExt, ResultExt};
49use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
50use table::table::adapter::DfTableProviderAdapter;
51
52use crate::instance::Instance;
53
54const DEFAULT_LIMIT: usize = 2000;
55
56#[async_trait]
57impl JaegerQueryHandler for Instance {
58    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
59        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
60        Ok(query_trace_table(
61            ctx,
62            self.catalog_manager(),
63            self.query_engine(),
64            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
65            vec![],
66            vec![],
67            None,
68            None,
69            vec![col(SERVICE_NAME_COLUMN)],
70        )
71        .await?)
72    }
73
74    async fn get_operations(
75        &self,
76        ctx: QueryContextRef,
77        service_name: &str,
78        span_kind: Option<&str>,
79        start_time: Option<i64>,
80        end_time: Option<i64>,
81    ) -> ServerResult<Output> {
82        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
83
84        if let Some(span_kind) = span_kind {
85            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
86                "{}{}",
87                SPAN_KIND_PREFIX,
88                span_kind.to_uppercase()
89            ))));
90        }
91
92        if let Some(start_time) = start_time {
93            // Microseconds to nanoseconds.
94            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
95        }
96
97        if let Some(end_time) = end_time {
98            // Microseconds to nanoseconds.
99            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
100        }
101
102        // It's equivalent to the following SQL query:
103        //
104        // ```
105        // SELECT DISTINCT span_name, span_kind
106        // FROM
107        //   {db}.{trace_table}
108        // WHERE
109        //   service_name = '{service_name}' AND
110        //   timestamp >= {start_time} AND
111        //   timestamp <= {end_time} AND
112        //   span_kind = '{span_kind}'
113        // ORDER BY
114        //   span_name ASC
115        // ```.
116        Ok(query_trace_table(
117            ctx,
118            self.catalog_manager(),
119            self.query_engine(),
120            vec![
121                SelectExpr::from(col(SPAN_NAME_COLUMN)),
122                SelectExpr::from(col(SPAN_KIND_COLUMN)),
123                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
124                SelectExpr::from(col(TIMESTAMP_COLUMN)),
125            ],
126            filters,
127            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
128            Some(DEFAULT_LIMIT),
129            None,
130            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
131        )
132        .await?)
133    }
134
135    async fn get_trace(
136        &self,
137        ctx: QueryContextRef,
138        trace_id: &str,
139        start_time: Option<i64>,
140        end_time: Option<i64>,
141    ) -> ServerResult<Output> {
142        // It's equivalent to the following SQL query:
143        //
144        // ```
145        // SELECT
146        //   *
147        // FROM
148        //   {db}.{trace_table}
149        // WHERE
150        //   trace_id = '{trace_id}' AND
151        //   timestamp >= {start_time} AND
152        //   timestamp <= {end_time}
153        // ORDER BY
154        //   timestamp DESC
155        // ```.
156        let selects = vec![wildcard()];
157
158        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
159
160        if let Some(start_time) = start_time {
161            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
162        }
163
164        if let Some(end_time) = end_time {
165            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
166        }
167
168        Ok(query_trace_table(
169            ctx,
170            self.catalog_manager(),
171            self.query_engine(),
172            selects,
173            filters,
174            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
175            Some(DEFAULT_LIMIT),
176            None,
177            vec![],
178        )
179        .await?)
180    }
181
182    async fn find_traces(
183        &self,
184        ctx: QueryContextRef,
185        query_params: QueryTraceParams,
186    ) -> ServerResult<Output> {
187        let mut filters = vec![];
188
189        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
190        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
191
192        if let Some(operation_name) = query_params.operation_name {
193            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
194        }
195
196        if let Some(start_time) = query_params.start_time {
197            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
198        }
199
200        if let Some(end_time) = query_params.end_time {
201            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
202        }
203
204        if let Some(min_duration) = query_params.min_duration {
205            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
206        }
207
208        if let Some(max_duration) = query_params.max_duration {
209            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
210        }
211
212        // Get all distinct trace ids that match the filters.
213        // It's equivalent to the following SQL query:
214        //
215        // ```
216        // SELECT DISTINCT trace_id
217        // FROM
218        //   {db}.{trace_table}
219        // WHERE
220        //   service_name = '{service_name}' AND
221        //   operation_name = '{operation_name}' AND
222        //   timestamp >= {start_time} AND
223        //   timestamp <= {end_time} AND
224        //   duration >= {min_duration} AND
225        //   duration <= {max_duration}
226        // LIMIT {limit}
227        // ```.
228        let output = query_trace_table(
229            ctx.clone(),
230            self.catalog_manager(),
231            self.query_engine(),
232            vec![wildcard()],
233            filters,
234            vec![],
235            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
236            query_params.tags,
237            vec![col(TRACE_ID_COLUMN)],
238        )
239        .await?;
240
241        // Get all traces that match the trace ids from the previous query.
242        // It's equivalent to the following SQL query:
243        //
244        // ```
245        // SELECT *
246        // FROM
247        //   {db}.{trace_table}
248        // WHERE
249        //   trace_id IN ({trace_ids}) AND
250        //   timestamp >= {start_time} AND
251        //   timestamp <= {end_time}
252        // ```
253        let mut filters = vec![
254            col(TRACE_ID_COLUMN).in_list(
255                trace_ids_from_output(output)
256                    .await?
257                    .iter()
258                    .map(lit)
259                    .collect::<Vec<Expr>>(),
260                false,
261            ),
262        ];
263
264        if let Some(start_time) = query_params.start_time {
265            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
266        }
267
268        if let Some(end_time) = query_params.end_time {
269            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
270        }
271
272        Ok(query_trace_table(
273            ctx,
274            self.catalog_manager(),
275            self.query_engine(),
276            vec![wildcard()],
277            filters,
278            vec![],
279            None,
280            None,
281            vec![],
282        )
283        .await?)
284    }
285}
286
287#[allow(clippy::too_many_arguments)]
288async fn query_trace_table(
289    ctx: QueryContextRef,
290    catalog_manager: &CatalogManagerRef,
291    query_engine: &QueryEngineRef,
292    selects: Vec<SelectExpr>,
293    filters: Vec<Expr>,
294    sorts: Vec<SortExpr>,
295    limit: Option<usize>,
296    tags: Option<HashMap<String, JsonValue>>,
297    distincts: Vec<Expr>,
298) -> ServerResult<Output> {
299    let trace_table_name = ctx
300        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
301        .unwrap_or(TRACE_TABLE_NAME);
302
303    // If only select services, use the trace services table.
304    let table_name = {
305        if match selects.as_slice() {
306            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
307            _ => false,
308        } {
309            &trace_services_table_name(trace_table_name)
310        } else {
311            trace_table_name
312        }
313    };
314
315    let table = catalog_manager
316        .table(
317            ctx.current_catalog(),
318            &ctx.current_schema(),
319            table_name,
320            Some(&ctx),
321        )
322        .await
323        .context(CatalogSnafu)?
324        .with_context(|| TableNotFoundSnafu {
325            table: table_name,
326            catalog: ctx.current_catalog(),
327            schema: ctx.current_schema(),
328        })?;
329
330    let is_data_model_v1 = table
331        .table_info()
332        .meta
333        .options
334        .extra_options
335        .get(TABLE_DATA_MODEL)
336        .map(|s| s.as_str())
337        == Some(TABLE_DATA_MODEL_TRACE_V1);
338
339    let df_context = create_df_context(query_engine)?;
340
341    let dataframe = df_context
342        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
343        .context(DataFusionSnafu)?;
344
345    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
346
347    // Apply all filters.
348    let dataframe = filters
349        .into_iter()
350        .chain(tags.map_or(Ok(vec![]), |t| {
351            tags_filters(&dataframe, t, is_data_model_v1)
352        })?)
353        .try_fold(dataframe, |df, expr| {
354            df.filter(expr).context(DataFusionSnafu)
355        })?;
356
357    // Apply the distinct if needed.
358    let dataframe = if !distincts.is_empty() {
359        dataframe
360            .distinct_on(distincts.clone(), distincts, None)
361            .context(DataFusionSnafu)?
362    } else {
363        dataframe
364    };
365
366    // Apply the sorts if needed.
367    let dataframe = if !sorts.is_empty() {
368        dataframe.sort(sorts).context(DataFusionSnafu)?
369    } else {
370        dataframe
371    };
372
373    // Apply the limit if needed.
374    let dataframe = if let Some(limit) = limit {
375        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
376    } else {
377        dataframe
378    };
379
380    // Execute the query and collect the result.
381    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
382
383    let output = Output::new_with_stream(Box::pin(
384        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
385    ));
386
387    Ok(output)
388}
389
390// The current implementation registers UDFs during the planning stage, which makes it difficult
391// to utilize them through DataFrame APIs. To address this limitation, we create a new session
392// context and register the required UDFs, allowing them to be decoupled from the global context.
393// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
394fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
395    let df_context = SessionContext::new_with_state(
396        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
397    );
398
399    // The following JSON UDFs will be used for tags filters on v0 data model.
400    let udfs: Vec<FunctionRef> = vec![
401        Arc::new(JsonGetInt::default()),
402        Arc::new(JsonGetFloat::default()),
403        Arc::new(JsonGetBool::default()),
404        Arc::new(JsonGetString::default()),
405    ];
406
407    for udf in udfs {
408        df_context.register_udf(create_udf(udf));
409    }
410
411    Ok(df_context)
412}
413
414fn json_tag_filters(
415    dataframe: &DataFrame,
416    tags: HashMap<String, JsonValue>,
417) -> ServerResult<Vec<Expr>> {
418    let mut filters = vec![];
419
420    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
421    for (key, value) in tags.iter() {
422        if let JsonValue::String(value) = value {
423            filters.push(
424                dataframe
425                    .registry()
426                    .udf(JsonGetString::NAME)
427                    .context(DataFusionSnafu)?
428                    .call(vec![
429                        col(SPAN_ATTRIBUTES_COLUMN),
430                        lit(format!("[\"{}\"]", key)),
431                    ])
432                    .eq(lit(value)),
433            );
434        }
435        if let JsonValue::Number(value) = value {
436            if value.is_i64() {
437                filters.push(
438                    dataframe
439                        .registry()
440                        .udf(JsonGetInt::NAME)
441                        .context(DataFusionSnafu)?
442                        .call(vec![
443                            col(SPAN_ATTRIBUTES_COLUMN),
444                            lit(format!("[\"{}\"]", key)),
445                        ])
446                        .eq(lit(value.as_i64().unwrap())),
447                );
448            }
449            if value.is_f64() {
450                filters.push(
451                    dataframe
452                        .registry()
453                        .udf(JsonGetFloat::NAME)
454                        .context(DataFusionSnafu)?
455                        .call(vec![
456                            col(SPAN_ATTRIBUTES_COLUMN),
457                            lit(format!("[\"{}\"]", key)),
458                        ])
459                        .eq(lit(value.as_f64().unwrap())),
460                );
461            }
462        }
463        if let JsonValue::Bool(value) = value {
464            filters.push(
465                dataframe
466                    .registry()
467                    .udf(JsonGetBool::NAME)
468                    .context(DataFusionSnafu)?
469                    .call(vec![
470                        col(SPAN_ATTRIBUTES_COLUMN),
471                        lit(format!("[\"{}\"]", key)),
472                    ])
473                    .eq(lit(*value)),
474            );
475        }
476    }
477
478    Ok(filters)
479}
480
481fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
482    let filters = tags
483        .into_iter()
484        .filter_map(|(key, value)| {
485            let key = format!("\"span_attributes.{}\"", key);
486            match value {
487                JsonValue::String(value) => Some(col(key).eq(lit(value))),
488                JsonValue::Number(value) => {
489                    if value.is_f64() {
490                        // safe to unwrap as checked previously
491                        Some(col(key).eq(lit(value.as_f64().unwrap())))
492                    } else {
493                        Some(col(key).eq(lit(value.as_i64().unwrap())))
494                    }
495                }
496                JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
497                JsonValue::Null => Some(col(key).is_null()),
498                // not supported at the moment
499                JsonValue::Array(_value) => None,
500                JsonValue::Object(_value) => None,
501            }
502        })
503        .collect();
504    Ok(filters)
505}
506
507fn tags_filters(
508    dataframe: &DataFrame,
509    tags: HashMap<String, JsonValue>,
510    is_data_model_v1: bool,
511) -> ServerResult<Vec<Expr>> {
512    if is_data_model_v1 {
513        flatten_tag_filters(tags)
514    } else {
515        json_tag_filters(dataframe, tags)
516    }
517}
518
519// Get trace ids from the output in recordbatches.
520async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
521    if let OutputData::Stream(stream) = output.data {
522        let schema = stream.schema().clone();
523        let recordbatches = util::collect(stream)
524            .await
525            .context(CollectRecordbatchSnafu)?;
526
527        // Only contains `trace_id` column in string type.
528        if !recordbatches.is_empty()
529            && schema.num_columns() == 1
530            && schema.contains_column(TRACE_ID_COLUMN)
531        {
532            let mut trace_ids = vec![];
533            for recordbatch in recordbatches {
534                for col in recordbatch.columns().iter() {
535                    for row_idx in 0..recordbatch.num_rows() {
536                        if let ValueRef::String(value) = col.get_ref(row_idx) {
537                            trace_ids.push(value.to_string());
538                        }
539                    }
540                }
541            }
542
543            return Ok(trace_ids);
544        }
545    }
546
547    Ok(vec![])
548}