frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21    TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use datafusion::dataframe::DataFrame;
32use datafusion::execution::SessionStateBuilder;
33use datafusion::execution::context::SessionContext;
34use datafusion_expr::select_expr::SelectExpr;
35use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
36use datatypes::value::ValueRef;
37use query::QueryEngineRef;
38use serde_json::Value as JsonValue;
39use servers::error::{
40    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
41    TableNotFoundSnafu,
42};
43use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
44use servers::otlp::trace::{
45    DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
46    SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
47};
48use servers::query_handler::JaegerQueryHandler;
49use session::context::QueryContextRef;
50use snafu::{OptionExt, ResultExt};
51use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
52use table::table::adapter::DfTableProviderAdapter;
53
54use crate::instance::Instance;
55
56const DEFAULT_LIMIT: usize = 2000;
57
58#[async_trait]
59impl JaegerQueryHandler for Instance {
60    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
61        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
62        Ok(query_trace_table(
63            ctx,
64            self.catalog_manager(),
65            self.query_engine(),
66            vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
67            vec![],
68            vec![],
69            None,
70            None,
71            vec![col(SERVICE_NAME_COLUMN)],
72        )
73        .await?)
74    }
75
76    async fn get_operations(
77        &self,
78        ctx: QueryContextRef,
79        service_name: &str,
80        span_kind: Option<&str>,
81    ) -> ServerResult<Output> {
82        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
83
84        if let Some(span_kind) = span_kind {
85            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
86                "{}{}",
87                SPAN_KIND_PREFIX,
88                span_kind.to_uppercase()
89            ))));
90        }
91
92        // It's equivalent to the following SQL query:
93        //
94        // ```
95        // SELECT DISTINCT span_name, span_kind
96        // FROM
97        //   {db}.{trace_table}
98        // WHERE
99        //   service_name = '{service_name}' AND
100        //   span_kind = '{span_kind}'
101        // ORDER BY
102        //   span_name ASC
103        // ```.
104        Ok(query_trace_table(
105            ctx,
106            self.catalog_manager(),
107            self.query_engine(),
108            vec![
109                SelectExpr::from(col(SPAN_NAME_COLUMN)),
110                SelectExpr::from(col(SPAN_KIND_COLUMN)),
111                SelectExpr::from(col(SERVICE_NAME_COLUMN)),
112                SelectExpr::from(col(TIMESTAMP_COLUMN)),
113            ],
114            filters,
115            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
116            Some(DEFAULT_LIMIT),
117            None,
118            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
119        )
120        .await?)
121    }
122
123    async fn get_trace(
124        &self,
125        ctx: QueryContextRef,
126        trace_id: &str,
127        start_time: Option<i64>,
128        end_time: Option<i64>,
129    ) -> ServerResult<Output> {
130        // It's equivalent to the following SQL query:
131        //
132        // ```
133        // SELECT
134        //   *
135        // FROM
136        //   {db}.{trace_table}
137        // WHERE
138        //   trace_id = '{trace_id}' AND
139        //   timestamp >= {start_time} AND
140        //   timestamp <= {end_time}
141        // ORDER BY
142        //   timestamp DESC
143        // ```.
144        let selects = vec![wildcard()];
145
146        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
147
148        if let Some(start_time) = start_time {
149            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
150        }
151
152        if let Some(end_time) = end_time {
153            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
154        }
155
156        Ok(query_trace_table(
157            ctx,
158            self.catalog_manager(),
159            self.query_engine(),
160            selects,
161            filters,
162            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
163            Some(DEFAULT_LIMIT),
164            None,
165            vec![],
166        )
167        .await?)
168    }
169
170    async fn find_traces(
171        &self,
172        ctx: QueryContextRef,
173        query_params: QueryTraceParams,
174    ) -> ServerResult<Output> {
175        let mut filters = vec![];
176
177        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
178        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
179
180        if let Some(operation_name) = query_params.operation_name {
181            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
182        }
183
184        if let Some(start_time) = query_params.start_time {
185            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
186        }
187
188        if let Some(end_time) = query_params.end_time {
189            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
190        }
191
192        if let Some(min_duration) = query_params.min_duration {
193            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
194        }
195
196        if let Some(max_duration) = query_params.max_duration {
197            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
198        }
199
200        // Get all distinct trace ids that match the filters.
201        // It's equivalent to the following SQL query:
202        //
203        // ```
204        // SELECT DISTINCT trace_id
205        // FROM
206        //   {db}.{trace_table}
207        // WHERE
208        //   service_name = '{service_name}' AND
209        //   operation_name = '{operation_name}' AND
210        //   timestamp >= {start_time} AND
211        //   timestamp <= {end_time} AND
212        //   duration >= {min_duration} AND
213        //   duration <= {max_duration}
214        // LIMIT {limit}
215        // ```.
216        let output = query_trace_table(
217            ctx.clone(),
218            self.catalog_manager(),
219            self.query_engine(),
220            vec![wildcard()],
221            filters,
222            vec![],
223            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
224            query_params.tags,
225            vec![col(TRACE_ID_COLUMN)],
226        )
227        .await?;
228
229        // Get all traces that match the trace ids from the previous query.
230        // It's equivalent to the following SQL query:
231        //
232        // ```
233        // SELECT *
234        // FROM
235        //   {db}.{trace_table}
236        // WHERE
237        //   trace_id IN ({trace_ids}) AND
238        //   timestamp >= {start_time} AND
239        //   timestamp <= {end_time}
240        // ```
241        let mut filters = vec![
242            col(TRACE_ID_COLUMN).in_list(
243                trace_ids_from_output(output)
244                    .await?
245                    .iter()
246                    .map(lit)
247                    .collect::<Vec<Expr>>(),
248                false,
249            ),
250        ];
251
252        if let Some(start_time) = query_params.start_time {
253            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
254        }
255
256        if let Some(end_time) = query_params.end_time {
257            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
258        }
259
260        Ok(query_trace_table(
261            ctx,
262            self.catalog_manager(),
263            self.query_engine(),
264            vec![wildcard()],
265            filters,
266            vec![],
267            None,
268            None,
269            vec![],
270        )
271        .await?)
272    }
273}
274
275#[allow(clippy::too_many_arguments)]
276async fn query_trace_table(
277    ctx: QueryContextRef,
278    catalog_manager: &CatalogManagerRef,
279    query_engine: &QueryEngineRef,
280    selects: Vec<SelectExpr>,
281    filters: Vec<Expr>,
282    sorts: Vec<SortExpr>,
283    limit: Option<usize>,
284    tags: Option<HashMap<String, JsonValue>>,
285    distincts: Vec<Expr>,
286) -> ServerResult<Output> {
287    let trace_table_name = ctx
288        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
289        .unwrap_or(TRACE_TABLE_NAME);
290
291    // If only select services, use the trace services table.
292    // If querying operations (distinct by span_name and span_kind), use the trace operations table.
293    let table_name = {
294        if match selects.as_slice() {
295            [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
296            _ => false,
297        } {
298            &trace_services_table_name(trace_table_name)
299        } else if !distincts.is_empty()
300            && distincts.contains(&col(SPAN_NAME_COLUMN))
301            && distincts.contains(&col(SPAN_KIND_COLUMN))
302        {
303            &trace_operations_table_name(trace_table_name)
304        } else {
305            trace_table_name
306        }
307    };
308
309    let table = catalog_manager
310        .table(
311            ctx.current_catalog(),
312            &ctx.current_schema(),
313            table_name,
314            Some(&ctx),
315        )
316        .await
317        .context(CatalogSnafu)?
318        .with_context(|| TableNotFoundSnafu {
319            table: table_name,
320            catalog: ctx.current_catalog(),
321            schema: ctx.current_schema(),
322        })?;
323
324    let is_data_model_v1 = table
325        .table_info()
326        .meta
327        .options
328        .extra_options
329        .get(TABLE_DATA_MODEL)
330        .map(|s| s.as_str())
331        == Some(TABLE_DATA_MODEL_TRACE_V1);
332
333    let df_context = create_df_context(query_engine)?;
334
335    let dataframe = df_context
336        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
337        .context(DataFusionSnafu)?;
338
339    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
340
341    // Apply all filters.
342    let dataframe = filters
343        .into_iter()
344        .chain(tags.map_or(Ok(vec![]), |t| {
345            tags_filters(&dataframe, t, is_data_model_v1)
346        })?)
347        .try_fold(dataframe, |df, expr| {
348            df.filter(expr).context(DataFusionSnafu)
349        })?;
350
351    // Apply the distinct if needed.
352    let dataframe = if !distincts.is_empty() {
353        dataframe
354            .distinct_on(distincts.clone(), distincts, None)
355            .context(DataFusionSnafu)?
356    } else {
357        dataframe
358    };
359
360    // Apply the sorts if needed.
361    let dataframe = if !sorts.is_empty() {
362        dataframe.sort(sorts).context(DataFusionSnafu)?
363    } else {
364        dataframe
365    };
366
367    // Apply the limit if needed.
368    let dataframe = if let Some(limit) = limit {
369        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
370    } else {
371        dataframe
372    };
373
374    // Execute the query and collect the result.
375    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
376
377    let output = Output::new_with_stream(Box::pin(
378        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
379    ));
380
381    Ok(output)
382}
383
384// The current implementation registers UDFs during the planning stage, which makes it difficult
385// to utilize them through DataFrame APIs. To address this limitation, we create a new session
386// context and register the required UDFs, allowing them to be decoupled from the global context.
387// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
388fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
389    let df_context = SessionContext::new_with_state(
390        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
391    );
392
393    // The following JSON UDFs will be used for tags filters on v0 data model.
394    let udfs: Vec<FunctionRef> = vec![
395        Arc::new(JsonGetInt::default()),
396        Arc::new(JsonGetFloat::default()),
397        Arc::new(JsonGetBool::default()),
398        Arc::new(JsonGetString::default()),
399    ];
400
401    for udf in udfs {
402        df_context.register_udf(create_udf(udf));
403    }
404
405    Ok(df_context)
406}
407
408fn json_tag_filters(
409    dataframe: &DataFrame,
410    tags: HashMap<String, JsonValue>,
411) -> ServerResult<Vec<Expr>> {
412    let mut filters = vec![];
413
414    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
415    for (key, value) in tags.iter() {
416        if let JsonValue::String(value) = value {
417            filters.push(
418                dataframe
419                    .registry()
420                    .udf(JsonGetString::NAME)
421                    .context(DataFusionSnafu)?
422                    .call(vec![
423                        col(SPAN_ATTRIBUTES_COLUMN),
424                        lit(format!("[\"{}\"]", key)),
425                    ])
426                    .eq(lit(value)),
427            );
428        }
429        if let JsonValue::Number(value) = value {
430            if value.is_i64() {
431                filters.push(
432                    dataframe
433                        .registry()
434                        .udf(JsonGetInt::NAME)
435                        .context(DataFusionSnafu)?
436                        .call(vec![
437                            col(SPAN_ATTRIBUTES_COLUMN),
438                            lit(format!("[\"{}\"]", key)),
439                        ])
440                        .eq(lit(value.as_i64().unwrap())),
441                );
442            }
443            if value.is_f64() {
444                filters.push(
445                    dataframe
446                        .registry()
447                        .udf(JsonGetFloat::NAME)
448                        .context(DataFusionSnafu)?
449                        .call(vec![
450                            col(SPAN_ATTRIBUTES_COLUMN),
451                            lit(format!("[\"{}\"]", key)),
452                        ])
453                        .eq(lit(value.as_f64().unwrap())),
454                );
455            }
456        }
457        if let JsonValue::Bool(value) = value {
458            filters.push(
459                dataframe
460                    .registry()
461                    .udf(JsonGetBool::NAME)
462                    .context(DataFusionSnafu)?
463                    .call(vec![
464                        col(SPAN_ATTRIBUTES_COLUMN),
465                        lit(format!("[\"{}\"]", key)),
466                    ])
467                    .eq(lit(*value)),
468            );
469        }
470    }
471
472    Ok(filters)
473}
474
475fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
476    let filters = tags
477        .into_iter()
478        .filter_map(|(key, value)| {
479            let key = format!("\"span_attributes.{}\"", key);
480            match value {
481                JsonValue::String(value) => Some(col(key).eq(lit(value))),
482                JsonValue::Number(value) => {
483                    if value.is_f64() {
484                        // safe to unwrap as checked previously
485                        Some(col(key).eq(lit(value.as_f64().unwrap())))
486                    } else {
487                        Some(col(key).eq(lit(value.as_i64().unwrap())))
488                    }
489                }
490                JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
491                JsonValue::Null => Some(col(key).is_null()),
492                // not supported at the moment
493                JsonValue::Array(_value) => None,
494                JsonValue::Object(_value) => None,
495            }
496        })
497        .collect();
498    Ok(filters)
499}
500
501fn tags_filters(
502    dataframe: &DataFrame,
503    tags: HashMap<String, JsonValue>,
504    is_data_model_v1: bool,
505) -> ServerResult<Vec<Expr>> {
506    if is_data_model_v1 {
507        flatten_tag_filters(tags)
508    } else {
509        json_tag_filters(dataframe, tags)
510    }
511}
512
513// Get trace ids from the output in recordbatches.
514async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
515    if let OutputData::Stream(stream) = output.data {
516        let schema = stream.schema().clone();
517        let recordbatches = util::collect(stream)
518            .await
519            .context(CollectRecordbatchSnafu)?;
520
521        // Only contains `trace_id` column in string type.
522        if !recordbatches.is_empty()
523            && schema.num_columns() == 1
524            && schema.contains_column(TRACE_ID_COLUMN)
525        {
526            let mut trace_ids = vec![];
527            for recordbatch in recordbatches {
528                for col in recordbatch.columns().iter() {
529                    for row_idx in 0..recordbatch.num_rows() {
530                        if let ValueRef::String(value) = col.get_ref(row_idx) {
531                            trace_ids.push(value.to_string());
532                        }
533                    }
534                }
535            }
536
537            return Ok(trace_ids);
538        }
539    }
540
541    Ok(vec![])
542}