frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{TRACE_TABLE_NAME, trace_services_table_name};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use common_recordbatch::util;
30use datafusion::dataframe::DataFrame;
31use datafusion::execution::SessionStateBuilder;
32use datafusion::execution::context::SessionContext;
33use datafusion_expr::select_expr::SelectExpr;
34use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
35use datatypes::value::ValueRef;
36use query::QueryEngineRef;
37use serde_json::Value as JsonValue;
38use servers::error::{
39    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
40    TableNotFoundSnafu,
41};
42use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
43use servers::otlp::trace::{
44    DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
45    SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
46};
47use servers::query_handler::JaegerQueryHandler;
48use session::context::QueryContextRef;
49use snafu::{OptionExt, ResultExt};
50use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
51use table::table::adapter::DfTableProviderAdapter;
52
53use crate::instance::Instance;
54
55const DEFAULT_LIMIT: usize = 2000;
56
57#[async_trait]
58impl JaegerQueryHandler for Instance {
59    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
60        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
61        Ok(query_trace_table(
62            ctx,
63            self.catalog_manager(),
64            self.query_engine(),
65            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
66            vec![],
67            vec![],
68            None,
69            None,
70            vec![col(SERVICE_NAME_COLUMN)],
71        )
72        .await?)
73    }
74
75    async fn get_operations(
76        &self,
77        ctx: QueryContextRef,
78        service_name: &str,
79        span_kind: Option<&str>,
80        start_time: Option<i64>,
81        end_time: Option<i64>,
82    ) -> ServerResult<Output> {
83        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
84
85        if let Some(span_kind) = span_kind {
86            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
87                "{}{}",
88                SPAN_KIND_PREFIX,
89                span_kind.to_uppercase()
90            ))));
91        }
92
93        if let Some(start_time) = start_time {
94            // Microseconds to nanoseconds.
95            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
96        }
97
98        if let Some(end_time) = end_time {
99            // Microseconds to nanoseconds.
100            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
101        }
102
103        // It's equivalent to the following SQL query:
104        //
105        // ```
106        // SELECT DISTINCT span_name, span_kind
107        // FROM
108        //   {db}.{trace_table}
109        // WHERE
110        //   service_name = '{service_name}' AND
111        //   timestamp >= {start_time} AND
112        //   timestamp <= {end_time} AND
113        //   span_kind = '{span_kind}'
114        // ORDER BY
115        //   span_name ASC
116        // ```.
117        Ok(query_trace_table(
118            ctx,
119            self.catalog_manager(),
120            self.query_engine(),
121            vec![
122                SelectExpr::from(col(SPAN_NAME_COLUMN)),
123                SelectExpr::from(col(SPAN_KIND_COLUMN)),
124                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
125                SelectExpr::from(col(TIMESTAMP_COLUMN)),
126            ],
127            filters,
128            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
129            Some(DEFAULT_LIMIT),
130            None,
131            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
132        )
133        .await?)
134    }
135
136    async fn get_trace(
137        &self,
138        ctx: QueryContextRef,
139        trace_id: &str,
140        start_time: Option<i64>,
141        end_time: Option<i64>,
142    ) -> ServerResult<Output> {
143        // It's equivalent to the following SQL query:
144        //
145        // ```
146        // SELECT
147        //   *
148        // FROM
149        //   {db}.{trace_table}
150        // WHERE
151        //   trace_id = '{trace_id}' AND
152        //   timestamp >= {start_time} AND
153        //   timestamp <= {end_time}
154        // ORDER BY
155        //   timestamp DESC
156        // ```.
157        let selects = vec![wildcard()];
158
159        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
160
161        if let Some(start_time) = start_time {
162            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
163        }
164
165        if let Some(end_time) = end_time {
166            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
167        }
168
169        Ok(query_trace_table(
170            ctx,
171            self.catalog_manager(),
172            self.query_engine(),
173            selects,
174            filters,
175            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
176            Some(DEFAULT_LIMIT),
177            None,
178            vec![],
179        )
180        .await?)
181    }
182
183    async fn find_traces(
184        &self,
185        ctx: QueryContextRef,
186        query_params: QueryTraceParams,
187    ) -> ServerResult<Output> {
188        let mut filters = vec![];
189
190        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
191        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
192
193        if let Some(operation_name) = query_params.operation_name {
194            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
195        }
196
197        if let Some(start_time) = query_params.start_time {
198            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
199        }
200
201        if let Some(end_time) = query_params.end_time {
202            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
203        }
204
205        if let Some(min_duration) = query_params.min_duration {
206            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
207        }
208
209        if let Some(max_duration) = query_params.max_duration {
210            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
211        }
212
213        // Get all distinct trace ids that match the filters.
214        // It's equivalent to the following SQL query:
215        //
216        // ```
217        // SELECT DISTINCT trace_id
218        // FROM
219        //   {db}.{trace_table}
220        // WHERE
221        //   service_name = '{service_name}' AND
222        //   operation_name = '{operation_name}' AND
223        //   timestamp >= {start_time} AND
224        //   timestamp <= {end_time} AND
225        //   duration >= {min_duration} AND
226        //   duration <= {max_duration}
227        // LIMIT {limit}
228        // ```.
229        let output = query_trace_table(
230            ctx.clone(),
231            self.catalog_manager(),
232            self.query_engine(),
233            vec![wildcard()],
234            filters,
235            vec![],
236            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
237            query_params.tags,
238            vec![col(TRACE_ID_COLUMN)],
239        )
240        .await?;
241
242        // Get all traces that match the trace ids from the previous query.
243        // It's equivalent to the following SQL query:
244        //
245        // ```
246        // SELECT *
247        // FROM
248        //   {db}.{trace_table}
249        // WHERE
250        //   trace_id IN ({trace_ids}) AND
251        //   timestamp >= {start_time} AND
252        //   timestamp <= {end_time}
253        // ```
254        let mut filters = vec![
255            col(TRACE_ID_COLUMN).in_list(
256                trace_ids_from_output(output)
257                    .await?
258                    .iter()
259                    .map(lit)
260                    .collect::<Vec<Expr>>(),
261                false,
262            ),
263        ];
264
265        if let Some(start_time) = query_params.start_time {
266            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
267        }
268
269        if let Some(end_time) = query_params.end_time {
270            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
271        }
272
273        Ok(query_trace_table(
274            ctx,
275            self.catalog_manager(),
276            self.query_engine(),
277            vec![wildcard()],
278            filters,
279            vec![],
280            None,
281            None,
282            vec![],
283        )
284        .await?)
285    }
286}
287
288#[allow(clippy::too_many_arguments)]
289async fn query_trace_table(
290    ctx: QueryContextRef,
291    catalog_manager: &CatalogManagerRef,
292    query_engine: &QueryEngineRef,
293    selects: Vec<SelectExpr>,
294    filters: Vec<Expr>,
295    sorts: Vec<SortExpr>,
296    limit: Option<usize>,
297    tags: Option<HashMap<String, JsonValue>>,
298    distincts: Vec<Expr>,
299) -> ServerResult<Output> {
300    let trace_table_name = ctx
301        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
302        .unwrap_or(TRACE_TABLE_NAME);
303
304    // If only select services, use the trace services table.
305    let table_name = {
306        if match selects.as_slice() {
307            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
308            _ => false,
309        } {
310            &trace_services_table_name(trace_table_name)
311        } else {
312            trace_table_name
313        }
314    };
315
316    let table = catalog_manager
317        .table(
318            ctx.current_catalog(),
319            &ctx.current_schema(),
320            table_name,
321            Some(&ctx),
322        )
323        .await
324        .context(CatalogSnafu)?
325        .with_context(|| TableNotFoundSnafu {
326            table: table_name,
327            catalog: ctx.current_catalog(),
328            schema: ctx.current_schema(),
329        })?;
330
331    let is_data_model_v1 = table
332        .table_info()
333        .meta
334        .options
335        .extra_options
336        .get(TABLE_DATA_MODEL)
337        .map(|s| s.as_str())
338        == Some(TABLE_DATA_MODEL_TRACE_V1);
339
340    let df_context = create_df_context(query_engine, ctx.clone())?;
341
342    let dataframe = df_context
343        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
344        .context(DataFusionSnafu)?;
345
346    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
347
348    // Apply all filters.
349    let dataframe = filters
350        .into_iter()
351        .chain(tags.map_or(Ok(vec![]), |t| {
352            tags_filters(&dataframe, t, is_data_model_v1)
353        })?)
354        .try_fold(dataframe, |df, expr| {
355            df.filter(expr).context(DataFusionSnafu)
356        })?;
357
358    // Apply the distinct if needed.
359    let dataframe = if !distincts.is_empty() {
360        dataframe
361            .distinct_on(distincts.clone(), distincts, None)
362            .context(DataFusionSnafu)?
363    } else {
364        dataframe
365    };
366
367    // Apply the sorts if needed.
368    let dataframe = if !sorts.is_empty() {
369        dataframe.sort(sorts).context(DataFusionSnafu)?
370    } else {
371        dataframe
372    };
373
374    // Apply the limit if needed.
375    let dataframe = if let Some(limit) = limit {
376        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
377    } else {
378        dataframe
379    };
380
381    // Execute the query and collect the result.
382    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
383
384    let output = Output::new_with_stream(Box::pin(
385        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
386    ));
387
388    Ok(output)
389}
390
391// The current implementation registers UDFs during the planning stage, which makes it difficult
392// to utilize them through DataFrame APIs. To address this limitation, we create a new session
393// context and register the required UDFs, allowing them to be decoupled from the global context.
394// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
395fn create_df_context(
396    query_engine: &QueryEngineRef,
397    ctx: QueryContextRef,
398) -> ServerResult<SessionContext> {
399    let df_context = SessionContext::new_with_state(
400        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
401    );
402
403    // The following JSON UDFs will be used for tags filters on v0 data model.
404    let udfs: Vec<FunctionRef> = vec![
405        Arc::new(JsonGetInt),
406        Arc::new(JsonGetFloat),
407        Arc::new(JsonGetBool),
408        Arc::new(JsonGetString),
409    ];
410
411    for udf in udfs {
412        df_context.register_udf(create_udf(
413            udf,
414            ctx.clone(),
415            Arc::new(FunctionState::default()),
416        ));
417    }
418
419    Ok(df_context)
420}
421
422fn json_tag_filters(
423    dataframe: &DataFrame,
424    tags: HashMap<String, JsonValue>,
425) -> ServerResult<Vec<Expr>> {
426    let mut filters = vec![];
427
428    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
429    for (key, value) in tags.iter() {
430        if let JsonValue::String(value) = value {
431            filters.push(
432                dataframe
433                    .registry()
434                    .udf(JsonGetString {}.name())
435                    .context(DataFusionSnafu)?
436                    .call(vec![
437                        col(SPAN_ATTRIBUTES_COLUMN),
438                        lit(format!("[\"{}\"]", key)),
439                    ])
440                    .eq(lit(value)),
441            );
442        }
443        if let JsonValue::Number(value) = value {
444            if value.is_i64() {
445                filters.push(
446                    dataframe
447                        .registry()
448                        .udf(JsonGetInt {}.name())
449                        .context(DataFusionSnafu)?
450                        .call(vec![
451                            col(SPAN_ATTRIBUTES_COLUMN),
452                            lit(format!("[\"{}\"]", key)),
453                        ])
454                        .eq(lit(value.as_i64().unwrap())),
455                );
456            }
457            if value.is_f64() {
458                filters.push(
459                    dataframe
460                        .registry()
461                        .udf(JsonGetFloat {}.name())
462                        .context(DataFusionSnafu)?
463                        .call(vec![
464                            col(SPAN_ATTRIBUTES_COLUMN),
465                            lit(format!("[\"{}\"]", key)),
466                        ])
467                        .eq(lit(value.as_f64().unwrap())),
468                );
469            }
470        }
471        if let JsonValue::Bool(value) = value {
472            filters.push(
473                dataframe
474                    .registry()
475                    .udf(JsonGetBool {}.name())
476                    .context(DataFusionSnafu)?
477                    .call(vec![
478                        col(SPAN_ATTRIBUTES_COLUMN),
479                        lit(format!("[\"{}\"]", key)),
480                    ])
481                    .eq(lit(*value)),
482            );
483        }
484    }
485
486    Ok(filters)
487}
488
489fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
490    let filters = tags
491        .into_iter()
492        .filter_map(|(key, value)| {
493            let key = format!("\"span_attributes.{}\"", key);
494            match value {
495                JsonValue::String(value) => Some(col(key).eq(lit(value))),
496                JsonValue::Number(value) => {
497                    if value.is_f64() {
498                        // safe to unwrap as checked previously
499                        Some(col(key).eq(lit(value.as_f64().unwrap())))
500                    } else {
501                        Some(col(key).eq(lit(value.as_i64().unwrap())))
502                    }
503                }
504                JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
505                JsonValue::Null => Some(col(key).is_null()),
506                // not supported at the moment
507                JsonValue::Array(_value) => None,
508                JsonValue::Object(_value) => None,
509            }
510        })
511        .collect();
512    Ok(filters)
513}
514
515fn tags_filters(
516    dataframe: &DataFrame,
517    tags: HashMap<String, JsonValue>,
518    is_data_model_v1: bool,
519) -> ServerResult<Vec<Expr>> {
520    if is_data_model_v1 {
521        flatten_tag_filters(tags)
522    } else {
523        json_tag_filters(dataframe, tags)
524    }
525}
526
527// Get trace ids from the output in recordbatches.
528async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
529    if let OutputData::Stream(stream) = output.data {
530        let schema = stream.schema().clone();
531        let recordbatches = util::collect(stream)
532            .await
533            .context(CollectRecordbatchSnafu)?;
534
535        // Only contains `trace_id` column in string type.
536        if !recordbatches.is_empty()
537            && schema.num_columns() == 1
538            && schema.contains_column(TRACE_ID_COLUMN)
539        {
540            let mut trace_ids = vec![];
541            for recordbatch in recordbatches {
542                for col in recordbatch.columns().iter() {
543                    for row_idx in 0..recordbatch.num_rows() {
544                        if let ValueRef::String(value) = col.get_ref(row_idx) {
545                            trace_ids.push(value.to_string());
546                        }
547                    }
548                }
549            }
550
551            return Ok(trace_ids);
552        }
553    }
554
555    Ok(vec![])
556}