frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use common_recordbatch::util;
30use datafusion::dataframe::DataFrame;
31use datafusion::execution::context::SessionContext;
32use datafusion::execution::SessionStateBuilder;
33use datafusion_expr::select_expr::SelectExpr;
34use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
35use datatypes::value::ValueRef;
36use query::QueryEngineRef;
37use serde_json::Value as JsonValue;
38use servers::error::{
39    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
40    TableNotFoundSnafu,
41};
42use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
43use servers::otlp::trace::{
44    DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
45    SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
46};
47use servers::query_handler::JaegerQueryHandler;
48use session::context::QueryContextRef;
49use snafu::{OptionExt, ResultExt};
50use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
51use table::table::adapter::DfTableProviderAdapter;
52
53use crate::instance::Instance;
54
55const DEFAULT_LIMIT: usize = 2000;
56
57#[async_trait]
58impl JaegerQueryHandler for Instance {
59    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
60        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
61        Ok(query_trace_table(
62            ctx,
63            self.catalog_manager(),
64            self.query_engine(),
65            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
66            vec![],
67            vec![],
68            None,
69            None,
70            vec![col(SERVICE_NAME_COLUMN)],
71        )
72        .await?)
73    }
74
75    async fn get_operations(
76        &self,
77        ctx: QueryContextRef,
78        service_name: &str,
79        span_kind: Option<&str>,
80        start_time: Option<i64>,
81        end_time: Option<i64>,
82    ) -> ServerResult<Output> {
83        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
84
85        if let Some(span_kind) = span_kind {
86            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
87                "{}{}",
88                SPAN_KIND_PREFIX,
89                span_kind.to_uppercase()
90            ))));
91        }
92
93        if let Some(start_time) = start_time {
94            // Microseconds to nanoseconds.
95            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
96        }
97
98        if let Some(end_time) = end_time {
99            // Microseconds to nanoseconds.
100            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
101        }
102
103        // It's equivalent to the following SQL query:
104        //
105        // ```
106        // SELECT DISTINCT span_name, span_kind
107        // FROM
108        //   {db}.{trace_table}
109        // WHERE
110        //   service_name = '{service_name}' AND
111        //   timestamp >= {start_time} AND
112        //   timestamp <= {end_time} AND
113        //   span_kind = '{span_kind}'
114        // ORDER BY
115        //   span_name ASC
116        // ```.
117        Ok(query_trace_table(
118            ctx,
119            self.catalog_manager(),
120            self.query_engine(),
121            vec![
122                SelectExpr::from(col(SPAN_NAME_COLUMN)),
123                SelectExpr::from(col(SPAN_KIND_COLUMN)),
124                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
125                SelectExpr::from(col(TIMESTAMP_COLUMN)),
126            ],
127            filters,
128            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
129            Some(DEFAULT_LIMIT),
130            None,
131            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
132        )
133        .await?)
134    }
135
136    async fn get_trace(
137        &self,
138        ctx: QueryContextRef,
139        trace_id: &str,
140        start_time: Option<i64>,
141        end_time: Option<i64>,
142    ) -> ServerResult<Output> {
143        // It's equivalent to the following SQL query:
144        //
145        // ```
146        // SELECT
147        //   *
148        // FROM
149        //   {db}.{trace_table}
150        // WHERE
151        //   trace_id = '{trace_id}' AND
152        //   timestamp >= {start_time} AND
153        //   timestamp <= {end_time}
154        // ORDER BY
155        //   timestamp DESC
156        // ```.
157        let selects = vec![wildcard()];
158
159        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
160
161        if let Some(start_time) = start_time {
162            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
163        }
164
165        if let Some(end_time) = end_time {
166            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
167        }
168
169        Ok(query_trace_table(
170            ctx,
171            self.catalog_manager(),
172            self.query_engine(),
173            selects,
174            filters,
175            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
176            Some(DEFAULT_LIMIT),
177            None,
178            vec![],
179        )
180        .await?)
181    }
182
183    async fn find_traces(
184        &self,
185        ctx: QueryContextRef,
186        query_params: QueryTraceParams,
187    ) -> ServerResult<Output> {
188        let mut filters = vec![];
189
190        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
191        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
192
193        if let Some(operation_name) = query_params.operation_name {
194            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
195        }
196
197        if let Some(start_time) = query_params.start_time {
198            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
199        }
200
201        if let Some(end_time) = query_params.end_time {
202            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
203        }
204
205        if let Some(min_duration) = query_params.min_duration {
206            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
207        }
208
209        if let Some(max_duration) = query_params.max_duration {
210            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
211        }
212
213        // Get all distinct trace ids that match the filters.
214        // It's equivalent to the following SQL query:
215        //
216        // ```
217        // SELECT DISTINCT trace_id
218        // FROM
219        //   {db}.{trace_table}
220        // WHERE
221        //   service_name = '{service_name}' AND
222        //   operation_name = '{operation_name}' AND
223        //   timestamp >= {start_time} AND
224        //   timestamp <= {end_time} AND
225        //   duration >= {min_duration} AND
226        //   duration <= {max_duration}
227        // LIMIT {limit}
228        // ```.
229        let output = query_trace_table(
230            ctx.clone(),
231            self.catalog_manager(),
232            self.query_engine(),
233            vec![wildcard()],
234            filters,
235            vec![],
236            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
237            query_params.tags,
238            vec![col(TRACE_ID_COLUMN)],
239        )
240        .await?;
241
242        // Get all traces that match the trace ids from the previous query.
243        // It's equivalent to the following SQL query:
244        //
245        // ```
246        // SELECT *
247        // FROM
248        //   {db}.{trace_table}
249        // WHERE
250        //   trace_id IN ({trace_ids}) AND
251        //   timestamp >= {start_time} AND
252        //   timestamp <= {end_time}
253        // ```
254        let mut filters = vec![col(TRACE_ID_COLUMN).in_list(
255            trace_ids_from_output(output)
256                .await?
257                .iter()
258                .map(lit)
259                .collect::<Vec<Expr>>(),
260            false,
261        )];
262
263        if let Some(start_time) = query_params.start_time {
264            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
265        }
266
267        if let Some(end_time) = query_params.end_time {
268            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
269        }
270
271        Ok(query_trace_table(
272            ctx,
273            self.catalog_manager(),
274            self.query_engine(),
275            vec![wildcard()],
276            filters,
277            vec![],
278            None,
279            None,
280            vec![],
281        )
282        .await?)
283    }
284}
285
286#[allow(clippy::too_many_arguments)]
287async fn query_trace_table(
288    ctx: QueryContextRef,
289    catalog_manager: &CatalogManagerRef,
290    query_engine: &QueryEngineRef,
291    selects: Vec<SelectExpr>,
292    filters: Vec<Expr>,
293    sorts: Vec<SortExpr>,
294    limit: Option<usize>,
295    tags: Option<HashMap<String, JsonValue>>,
296    distincts: Vec<Expr>,
297) -> ServerResult<Output> {
298    let trace_table_name = ctx
299        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
300        .unwrap_or(TRACE_TABLE_NAME);
301
302    // If only select services, use the trace services table.
303    let table_name = {
304        if match selects.as_slice() {
305            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
306            _ => false,
307        } {
308            &trace_services_table_name(trace_table_name)
309        } else {
310            trace_table_name
311        }
312    };
313
314    let table = catalog_manager
315        .table(
316            ctx.current_catalog(),
317            &ctx.current_schema(),
318            table_name,
319            Some(&ctx),
320        )
321        .await
322        .context(CatalogSnafu)?
323        .with_context(|| TableNotFoundSnafu {
324            table: table_name,
325            catalog: ctx.current_catalog(),
326            schema: ctx.current_schema(),
327        })?;
328
329    let is_data_model_v1 = table
330        .table_info()
331        .meta
332        .options
333        .extra_options
334        .get(TABLE_DATA_MODEL)
335        .map(|s| s.as_str())
336        == Some(TABLE_DATA_MODEL_TRACE_V1);
337
338    let df_context = create_df_context(query_engine, ctx.clone())?;
339
340    let dataframe = df_context
341        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
342        .context(DataFusionSnafu)?;
343
344    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
345
346    // Apply all filters.
347    let dataframe = filters
348        .into_iter()
349        .chain(tags.map_or(Ok(vec![]), |t| {
350            tags_filters(&dataframe, t, is_data_model_v1)
351        })?)
352        .try_fold(dataframe, |df, expr| {
353            df.filter(expr).context(DataFusionSnafu)
354        })?;
355
356    // Apply the distinct if needed.
357    let dataframe = if !distincts.is_empty() {
358        dataframe
359            .distinct_on(distincts.clone(), distincts, None)
360            .context(DataFusionSnafu)?
361    } else {
362        dataframe
363    };
364
365    // Apply the sorts if needed.
366    let dataframe = if !sorts.is_empty() {
367        dataframe.sort(sorts).context(DataFusionSnafu)?
368    } else {
369        dataframe
370    };
371
372    // Apply the limit if needed.
373    let dataframe = if let Some(limit) = limit {
374        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
375    } else {
376        dataframe
377    };
378
379    // Execute the query and collect the result.
380    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
381
382    let output = Output::new_with_stream(Box::pin(
383        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
384    ));
385
386    Ok(output)
387}
388
389// The current implementation registers UDFs during the planning stage, which makes it difficult
390// to utilize them through DataFrame APIs. To address this limitation, we create a new session
391// context and register the required UDFs, allowing them to be decoupled from the global context.
392// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
393fn create_df_context(
394    query_engine: &QueryEngineRef,
395    ctx: QueryContextRef,
396) -> ServerResult<SessionContext> {
397    let df_context = SessionContext::new_with_state(
398        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
399    );
400
401    // The following JSON UDFs will be used for tags filters on v0 data model.
402    let udfs: Vec<FunctionRef> = vec![
403        Arc::new(JsonGetInt),
404        Arc::new(JsonGetFloat),
405        Arc::new(JsonGetBool),
406        Arc::new(JsonGetString),
407    ];
408
409    for udf in udfs {
410        df_context.register_udf(create_udf(
411            udf,
412            ctx.clone(),
413            Arc::new(FunctionState::default()),
414        ));
415    }
416
417    Ok(df_context)
418}
419
420fn json_tag_filters(
421    dataframe: &DataFrame,
422    tags: HashMap<String, JsonValue>,
423) -> ServerResult<Vec<Expr>> {
424    let mut filters = vec![];
425
426    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
427    for (key, value) in tags.iter() {
428        if let JsonValue::String(value) = value {
429            filters.push(
430                dataframe
431                    .registry()
432                    .udf(JsonGetString {}.name())
433                    .context(DataFusionSnafu)?
434                    .call(vec![
435                        col(SPAN_ATTRIBUTES_COLUMN),
436                        lit(format!("[\"{}\"]", key)),
437                    ])
438                    .eq(lit(value)),
439            );
440        }
441        if let JsonValue::Number(value) = value {
442            if value.is_i64() {
443                filters.push(
444                    dataframe
445                        .registry()
446                        .udf(JsonGetInt {}.name())
447                        .context(DataFusionSnafu)?
448                        .call(vec![
449                            col(SPAN_ATTRIBUTES_COLUMN),
450                            lit(format!("[\"{}\"]", key)),
451                        ])
452                        .eq(lit(value.as_i64().unwrap())),
453                );
454            }
455            if value.is_f64() {
456                filters.push(
457                    dataframe
458                        .registry()
459                        .udf(JsonGetFloat {}.name())
460                        .context(DataFusionSnafu)?
461                        .call(vec![
462                            col(SPAN_ATTRIBUTES_COLUMN),
463                            lit(format!("[\"{}\"]", key)),
464                        ])
465                        .eq(lit(value.as_f64().unwrap())),
466                );
467            }
468        }
469        if let JsonValue::Bool(value) = value {
470            filters.push(
471                dataframe
472                    .registry()
473                    .udf(JsonGetBool {}.name())
474                    .context(DataFusionSnafu)?
475                    .call(vec![
476                        col(SPAN_ATTRIBUTES_COLUMN),
477                        lit(format!("[\"{}\"]", key)),
478                    ])
479                    .eq(lit(*value)),
480            );
481        }
482    }
483
484    Ok(filters)
485}
486
487fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
488    let filters = tags
489        .into_iter()
490        .filter_map(|(key, value)| {
491            let key = format!("\"span_attributes.{}\"", key);
492            match value {
493                JsonValue::String(value) => Some(col(key).eq(lit(value))),
494                JsonValue::Number(value) => {
495                    if value.is_f64() {
496                        // safe to unwrap as checked previously
497                        Some(col(key).eq(lit(value.as_f64().unwrap())))
498                    } else {
499                        Some(col(key).eq(lit(value.as_i64().unwrap())))
500                    }
501                }
502                JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
503                JsonValue::Null => Some(col(key).is_null()),
504                // not supported at the moment
505                JsonValue::Array(_value) => None,
506                JsonValue::Object(_value) => None,
507            }
508        })
509        .collect();
510    Ok(filters)
511}
512
513fn tags_filters(
514    dataframe: &DataFrame,
515    tags: HashMap<String, JsonValue>,
516    is_data_model_v1: bool,
517) -> ServerResult<Vec<Expr>> {
518    if is_data_model_v1 {
519        flatten_tag_filters(tags)
520    } else {
521        json_tag_filters(dataframe, tags)
522    }
523}
524
525// Get trace ids from the output in recordbatches.
526async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
527    if let OutputData::Stream(stream) = output.data {
528        let schema = stream.schema().clone();
529        let recordbatches = util::collect(stream)
530            .await
531            .context(CollectRecordbatchSnafu)?;
532
533        // Only contains `trace_id` column in string type.
534        if !recordbatches.is_empty()
535            && schema.num_columns() == 1
536            && schema.contains_column(TRACE_ID_COLUMN)
537        {
538            let mut trace_ids = vec![];
539            for recordbatch in recordbatches {
540                for col in recordbatch.columns().iter() {
541                    for row_idx in 0..recordbatch.num_rows() {
542                        if let ValueRef::String(value) = col.get_ref(row_idx) {
543                            trace_ids.push(value.to_string());
544                        }
545                    }
546                }
547            }
548
549            return Ok(trace_ids);
550        }
551    }
552
553    Ok(vec![])
554}