frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use common_recordbatch::util;
30use datafusion::dataframe::DataFrame;
31use datafusion::execution::context::SessionContext;
32use datafusion::execution::SessionStateBuilder;
33use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
34use datatypes::value::ValueRef;
35use query::QueryEngineRef;
36use serde_json::Value as JsonValue;
37use servers::error::{
38    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
39    TableNotFoundSnafu,
40};
41use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
42use servers::otlp::trace::{
43    DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
44    SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
45};
46use servers::query_handler::JaegerQueryHandler;
47use session::context::QueryContextRef;
48use snafu::{OptionExt, ResultExt};
49use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
50use table::table::adapter::DfTableProviderAdapter;
51
52use crate::instance::Instance;
53
54const DEFAULT_LIMIT: usize = 2000;
55
56#[async_trait]
57impl JaegerQueryHandler for Instance {
58    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
59        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
60        Ok(query_trace_table(
61            ctx,
62            self.catalog_manager(),
63            self.query_engine(),
64            vec![col(SERVICE_NAME_COLUMN)],
65            vec![],
66            vec![],
67            None,
68            None,
69            vec![col(SERVICE_NAME_COLUMN)],
70        )
71        .await?)
72    }
73
74    async fn get_operations(
75        &self,
76        ctx: QueryContextRef,
77        service_name: &str,
78        span_kind: Option<&str>,
79        start_time: Option<i64>,
80        end_time: Option<i64>,
81    ) -> ServerResult<Output> {
82        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
83
84        if let Some(span_kind) = span_kind {
85            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
86                "{}{}",
87                SPAN_KIND_PREFIX,
88                span_kind.to_uppercase()
89            ))));
90        }
91
92        if let Some(start_time) = start_time {
93            // Microseconds to nanoseconds.
94            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
95        }
96
97        if let Some(end_time) = end_time {
98            // Microseconds to nanoseconds.
99            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
100        }
101
102        // It's equivalent to the following SQL query:
103        //
104        // ```
105        // SELECT DISTINCT span_name, span_kind
106        // FROM
107        //   {db}.{trace_table}
108        // WHERE
109        //   service_name = '{service_name}' AND
110        //   timestamp >= {start_time} AND
111        //   timestamp <= {end_time} AND
112        //   span_kind = '{span_kind}'
113        // ORDER BY
114        //   span_name ASC
115        // ```.
116        Ok(query_trace_table(
117            ctx,
118            self.catalog_manager(),
119            self.query_engine(),
120            vec![
121                col(SPAN_NAME_COLUMN),
122                col(SPAN_KIND_COLUMN),
123                col(SERVICE_NAME_COLUMN),
124                col(TIMESTAMP_COLUMN),
125            ],
126            filters,
127            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
128            Some(DEFAULT_LIMIT),
129            None,
130            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
131        )
132        .await?)
133    }
134
135    async fn get_trace(
136        &self,
137        ctx: QueryContextRef,
138        trace_id: &str,
139        start_time: Option<i64>,
140        end_time: Option<i64>,
141    ) -> ServerResult<Output> {
142        // It's equivalent to the following SQL query:
143        //
144        // ```
145        // SELECT
146        //   *
147        // FROM
148        //   {db}.{trace_table}
149        // WHERE
150        //   trace_id = '{trace_id}' AND
151        //   timestamp >= {start_time} AND
152        //   timestamp <= {end_time}
153        // ORDER BY
154        //   timestamp DESC
155        // ```.
156        let selects = vec![wildcard()];
157
158        let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
159
160        if let Some(start_time) = start_time {
161            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
162        }
163
164        if let Some(end_time) = end_time {
165            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
166        }
167
168        Ok(query_trace_table(
169            ctx,
170            self.catalog_manager(),
171            self.query_engine(),
172            selects,
173            filters,
174            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
175            Some(DEFAULT_LIMIT),
176            None,
177            vec![],
178        )
179        .await?)
180    }
181
182    async fn find_traces(
183        &self,
184        ctx: QueryContextRef,
185        query_params: QueryTraceParams,
186    ) -> ServerResult<Output> {
187        let mut filters = vec![];
188
189        // `service_name` is already validated in `from_jaeger_query_params()`, so no additional check needed here.
190        filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
191
192        if let Some(operation_name) = query_params.operation_name {
193            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
194        }
195
196        if let Some(start_time) = query_params.start_time {
197            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
198        }
199
200        if let Some(end_time) = query_params.end_time {
201            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
202        }
203
204        if let Some(min_duration) = query_params.min_duration {
205            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
206        }
207
208        if let Some(max_duration) = query_params.max_duration {
209            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
210        }
211
212        // Get all distinct trace ids that match the filters.
213        // It's equivalent to the following SQL query:
214        //
215        // ```
216        // SELECT DISTINCT trace_id
217        // FROM
218        //   {db}.{trace_table}
219        // WHERE
220        //   service_name = '{service_name}' AND
221        //   operation_name = '{operation_name}' AND
222        //   timestamp >= {start_time} AND
223        //   timestamp <= {end_time} AND
224        //   duration >= {min_duration} AND
225        //   duration <= {max_duration}
226        // LIMIT {limit}
227        // ```.
228        let output = query_trace_table(
229            ctx.clone(),
230            self.catalog_manager(),
231            self.query_engine(),
232            vec![wildcard()],
233            filters,
234            vec![],
235            Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
236            query_params.tags,
237            vec![col(TRACE_ID_COLUMN)],
238        )
239        .await?;
240
241        // Get all traces that match the trace ids from the previous query.
242        // It's equivalent to the following SQL query:
243        //
244        // ```
245        // SELECT *
246        // FROM
247        //   {db}.{trace_table}
248        // WHERE
249        //   trace_id IN ({trace_ids}) AND
250        //   timestamp >= {start_time} AND
251        //   timestamp <= {end_time}
252        // ```
253        let mut filters = vec![col(TRACE_ID_COLUMN).in_list(
254            trace_ids_from_output(output)
255                .await?
256                .iter()
257                .map(lit)
258                .collect::<Vec<Expr>>(),
259            false,
260        )];
261
262        if let Some(start_time) = query_params.start_time {
263            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
264        }
265
266        if let Some(end_time) = query_params.end_time {
267            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
268        }
269
270        Ok(query_trace_table(
271            ctx,
272            self.catalog_manager(),
273            self.query_engine(),
274            vec![wildcard()],
275            filters,
276            vec![],
277            None,
278            None,
279            vec![],
280        )
281        .await?)
282    }
283}
284
285#[allow(clippy::too_many_arguments)]
286async fn query_trace_table(
287    ctx: QueryContextRef,
288    catalog_manager: &CatalogManagerRef,
289    query_engine: &QueryEngineRef,
290    selects: Vec<Expr>,
291    filters: Vec<Expr>,
292    sorts: Vec<SortExpr>,
293    limit: Option<usize>,
294    tags: Option<HashMap<String, JsonValue>>,
295    distincts: Vec<Expr>,
296) -> ServerResult<Output> {
297    let trace_table_name = ctx
298        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
299        .unwrap_or(TRACE_TABLE_NAME);
300
301    // If only select services, use the trace services table.
302    let table_name = {
303        if selects.len() == 1 && selects[0] == col(SERVICE_NAME_COLUMN) {
304            &trace_services_table_name(trace_table_name)
305        } else {
306            trace_table_name
307        }
308    };
309
310    let table = catalog_manager
311        .table(
312            ctx.current_catalog(),
313            &ctx.current_schema(),
314            table_name,
315            Some(&ctx),
316        )
317        .await
318        .context(CatalogSnafu)?
319        .with_context(|| TableNotFoundSnafu {
320            table: table_name,
321            catalog: ctx.current_catalog(),
322            schema: ctx.current_schema(),
323        })?;
324
325    let is_data_model_v1 = table
326        .table_info()
327        .meta
328        .options
329        .extra_options
330        .get(TABLE_DATA_MODEL)
331        .map(|s| s.as_str())
332        == Some(TABLE_DATA_MODEL_TRACE_V1);
333
334    let df_context = create_df_context(query_engine, ctx.clone())?;
335
336    let dataframe = df_context
337        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
338        .context(DataFusionSnafu)?;
339
340    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
341
342    // Apply all filters.
343    let dataframe = filters
344        .into_iter()
345        .chain(tags.map_or(Ok(vec![]), |t| {
346            tags_filters(&dataframe, t, is_data_model_v1)
347        })?)
348        .try_fold(dataframe, |df, expr| {
349            df.filter(expr).context(DataFusionSnafu)
350        })?;
351
352    // Apply the distinct if needed.
353    let dataframe = if !distincts.is_empty() {
354        dataframe
355            .distinct_on(distincts.clone(), distincts, None)
356            .context(DataFusionSnafu)?
357    } else {
358        dataframe
359    };
360
361    // Apply the sorts if needed.
362    let dataframe = if !sorts.is_empty() {
363        dataframe.sort(sorts).context(DataFusionSnafu)?
364    } else {
365        dataframe
366    };
367
368    // Apply the limit if needed.
369    let dataframe = if let Some(limit) = limit {
370        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
371    } else {
372        dataframe
373    };
374
375    // Execute the query and collect the result.
376    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
377
378    let output = Output::new_with_stream(Box::pin(
379        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
380    ));
381
382    Ok(output)
383}
384
385// The current implementation registers UDFs during the planning stage, which makes it difficult
386// to utilize them through DataFrame APIs. To address this limitation, we create a new session
387// context and register the required UDFs, allowing them to be decoupled from the global context.
388// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
389fn create_df_context(
390    query_engine: &QueryEngineRef,
391    ctx: QueryContextRef,
392) -> ServerResult<SessionContext> {
393    let df_context = SessionContext::new_with_state(
394        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
395    );
396
397    // The following JSON UDFs will be used for tags filters on v0 data model.
398    let udfs: Vec<FunctionRef> = vec![
399        Arc::new(JsonGetInt),
400        Arc::new(JsonGetFloat),
401        Arc::new(JsonGetBool),
402        Arc::new(JsonGetString),
403    ];
404
405    for udf in udfs {
406        df_context.register_udf(create_udf(
407            udf,
408            ctx.clone(),
409            Arc::new(FunctionState::default()),
410        ));
411    }
412
413    Ok(df_context)
414}
415
416fn json_tag_filters(
417    dataframe: &DataFrame,
418    tags: HashMap<String, JsonValue>,
419) -> ServerResult<Vec<Expr>> {
420    let mut filters = vec![];
421
422    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
423    for (key, value) in tags.iter() {
424        if let JsonValue::String(value) = value {
425            filters.push(
426                dataframe
427                    .registry()
428                    .udf(JsonGetString {}.name())
429                    .context(DataFusionSnafu)?
430                    .call(vec![
431                        col(SPAN_ATTRIBUTES_COLUMN),
432                        lit(format!("[\"{}\"]", key)),
433                    ])
434                    .eq(lit(value)),
435            );
436        }
437        if let JsonValue::Number(value) = value {
438            if value.is_i64() {
439                filters.push(
440                    dataframe
441                        .registry()
442                        .udf(JsonGetInt {}.name())
443                        .context(DataFusionSnafu)?
444                        .call(vec![
445                            col(SPAN_ATTRIBUTES_COLUMN),
446                            lit(format!("[\"{}\"]", key)),
447                        ])
448                        .eq(lit(value.as_i64().unwrap())),
449                );
450            }
451            if value.is_f64() {
452                filters.push(
453                    dataframe
454                        .registry()
455                        .udf(JsonGetFloat {}.name())
456                        .context(DataFusionSnafu)?
457                        .call(vec![
458                            col(SPAN_ATTRIBUTES_COLUMN),
459                            lit(format!("[\"{}\"]", key)),
460                        ])
461                        .eq(lit(value.as_f64().unwrap())),
462                );
463            }
464        }
465        if let JsonValue::Bool(value) = value {
466            filters.push(
467                dataframe
468                    .registry()
469                    .udf(JsonGetBool {}.name())
470                    .context(DataFusionSnafu)?
471                    .call(vec![
472                        col(SPAN_ATTRIBUTES_COLUMN),
473                        lit(format!("[\"{}\"]", key)),
474                    ])
475                    .eq(lit(*value)),
476            );
477        }
478    }
479
480    Ok(filters)
481}
482
483fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
484    let filters = tags
485        .into_iter()
486        .filter_map(|(key, value)| {
487            let key = format!("\"span_attributes.{}\"", key);
488            match value {
489                JsonValue::String(value) => Some(col(key).eq(lit(value))),
490                JsonValue::Number(value) => {
491                    if value.is_f64() {
492                        // safe to unwrap as checked previously
493                        Some(col(key).eq(lit(value.as_f64().unwrap())))
494                    } else {
495                        Some(col(key).eq(lit(value.as_i64().unwrap())))
496                    }
497                }
498                JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
499                JsonValue::Null => Some(col(key).is_null()),
500                // not supported at the moment
501                JsonValue::Array(_value) => None,
502                JsonValue::Object(_value) => None,
503            }
504        })
505        .collect();
506    Ok(filters)
507}
508
509fn tags_filters(
510    dataframe: &DataFrame,
511    tags: HashMap<String, JsonValue>,
512    is_data_model_v1: bool,
513) -> ServerResult<Vec<Expr>> {
514    if is_data_model_v1 {
515        flatten_tag_filters(tags)
516    } else {
517        json_tag_filters(dataframe, tags)
518    }
519}
520
521// Get trace ids from the output in recordbatches.
522async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
523    if let OutputData::Stream(stream) = output.data {
524        let schema = stream.schema().clone();
525        let recordbatches = util::collect(stream)
526            .await
527            .context(CollectRecordbatchSnafu)?;
528
529        // Only contains `trace_id` column in string type.
530        if !recordbatches.is_empty()
531            && schema.num_columns() == 1
532            && schema.contains_column(TRACE_ID_COLUMN)
533        {
534            let mut trace_ids = vec![];
535            for recordbatch in recordbatches {
536                for col in recordbatch.columns().iter() {
537                    for row_idx in 0..recordbatch.num_rows() {
538                        if let ValueRef::String(value) = col.get_ref(row_idx) {
539                            trace_ids.push(value.to_string());
540                        }
541                    }
542                }
543            }
544
545            return Ok(trace_ids);
546        }
547    }
548
549    Ok(vec![])
550}