frontend/instance/
jaeger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23    JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::Output;
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use datafusion::dataframe::DataFrame;
30use datafusion::execution::context::SessionContext;
31use datafusion::execution::SessionStateBuilder;
32use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
33use query::QueryEngineRef;
34use serde_json::Value as JsonValue;
35use servers::error::{
36    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
37    TableNotFoundSnafu,
38};
39use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
40use servers::otlp::trace::{
41    DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
42    SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
43};
44use servers::query_handler::JaegerQueryHandler;
45use session::context::QueryContextRef;
46use snafu::{OptionExt, ResultExt};
47use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
48use table::table::adapter::DfTableProviderAdapter;
49
50use crate::instance::Instance;
51
52const DEFAULT_LIMIT: usize = 2000;
53
54#[async_trait]
55impl JaegerQueryHandler for Instance {
56    async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
57        // It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
58        Ok(query_trace_table(
59            ctx,
60            self.catalog_manager(),
61            self.query_engine(),
62            vec![col(SERVICE_NAME_COLUMN)],
63            vec![],
64            vec![],
65            None,
66            None,
67            vec![col(SERVICE_NAME_COLUMN)],
68        )
69        .await?)
70    }
71
72    async fn get_operations(
73        &self,
74        ctx: QueryContextRef,
75        service_name: &str,
76        span_kind: Option<&str>,
77        start_time: Option<i64>,
78        end_time: Option<i64>,
79    ) -> ServerResult<Output> {
80        let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
81
82        if let Some(span_kind) = span_kind {
83            filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
84                "{}{}",
85                SPAN_KIND_PREFIX,
86                span_kind.to_uppercase()
87            ))));
88        }
89
90        if let Some(start_time) = start_time {
91            // Microseconds to nanoseconds.
92            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
93        }
94
95        if let Some(end_time) = end_time {
96            // Microseconds to nanoseconds.
97            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
98        }
99
100        // It's equivalent to
101        //
102        // ```
103        // SELECT DISTINCT span_name, span_kind
104        // FROM
105        //   {db}.{trace_table}
106        // WHERE
107        //   service_name = '{service_name}' AND
108        //   timestamp >= {start_time} AND
109        //   timestamp <= {end_time} AND
110        //   span_kind = '{span_kind}'
111        // ORDER BY
112        //   span_name ASC
113        // ```.
114        Ok(query_trace_table(
115            ctx,
116            self.catalog_manager(),
117            self.query_engine(),
118            vec![
119                col(SPAN_NAME_COLUMN),
120                col(SPAN_KIND_COLUMN),
121                col(SERVICE_NAME_COLUMN),
122                col(TIMESTAMP_COLUMN),
123            ],
124            filters,
125            vec![col(SPAN_NAME_COLUMN).sort(true, false)], // Sort by span_name in ascending order.
126            Some(DEFAULT_LIMIT),
127            None,
128            vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
129        )
130        .await?)
131    }
132
133    async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
134        // It's equivalent to
135        //
136        // ```
137        // SELECT
138        //   *
139        // FROM
140        //   {db}.{trace_table}
141        // WHERE
142        //   trace_id = '{trace_id}'
143        // ORDER BY
144        //   timestamp DESC
145        // ```.
146        let selects = vec![wildcard()];
147
148        let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
149
150        Ok(query_trace_table(
151            ctx,
152            self.catalog_manager(),
153            self.query_engine(),
154            selects,
155            filters,
156            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
157            Some(DEFAULT_LIMIT),
158            None,
159            vec![],
160        )
161        .await?)
162    }
163
164    async fn find_traces(
165        &self,
166        ctx: QueryContextRef,
167        query_params: QueryTraceParams,
168    ) -> ServerResult<Output> {
169        let selects = vec![wildcard()];
170
171        let mut filters = vec![];
172
173        if let Some(operation_name) = query_params.operation_name {
174            filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
175        }
176
177        if let Some(start_time) = query_params.start_time {
178            filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
179        }
180
181        if let Some(end_time) = query_params.end_time {
182            filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
183        }
184
185        if let Some(min_duration) = query_params.min_duration {
186            filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
187        }
188
189        if let Some(max_duration) = query_params.max_duration {
190            filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
191        }
192
193        Ok(query_trace_table(
194            ctx,
195            self.catalog_manager(),
196            self.query_engine(),
197            selects,
198            filters,
199            vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
200            Some(DEFAULT_LIMIT),
201            query_params.tags,
202            vec![],
203        )
204        .await?)
205    }
206}
207
208#[allow(clippy::too_many_arguments)]
209async fn query_trace_table(
210    ctx: QueryContextRef,
211    catalog_manager: &CatalogManagerRef,
212    query_engine: &QueryEngineRef,
213    selects: Vec<Expr>,
214    filters: Vec<Expr>,
215    sorts: Vec<SortExpr>,
216    limit: Option<usize>,
217    tags: Option<HashMap<String, JsonValue>>,
218    distincts: Vec<Expr>,
219) -> ServerResult<Output> {
220    let trace_table_name = ctx
221        .extension(JAEGER_QUERY_TABLE_NAME_KEY)
222        .unwrap_or(TRACE_TABLE_NAME);
223
224    // If only select services, use the trace services table.
225    let table_name = {
226        if selects.len() == 1 && selects[0] == col(SERVICE_NAME_COLUMN) {
227            &trace_services_table_name(trace_table_name)
228        } else {
229            trace_table_name
230        }
231    };
232
233    let table = catalog_manager
234        .table(
235            ctx.current_catalog(),
236            &ctx.current_schema(),
237            table_name,
238            Some(&ctx),
239        )
240        .await
241        .context(CatalogSnafu)?
242        .with_context(|| TableNotFoundSnafu {
243            table: table_name,
244            catalog: ctx.current_catalog(),
245            schema: ctx.current_schema(),
246        })?;
247
248    let is_data_model_v1 = table
249        .table_info()
250        .meta
251        .options
252        .extra_options
253        .get(TABLE_DATA_MODEL)
254        .map(|s| s.as_str())
255        == Some(TABLE_DATA_MODEL_TRACE_V1);
256
257    let df_context = create_df_context(query_engine, ctx.clone())?;
258
259    let dataframe = df_context
260        .read_table(Arc::new(DfTableProviderAdapter::new(table)))
261        .context(DataFusionSnafu)?;
262
263    let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
264
265    // Apply all filters.
266    let dataframe = filters
267        .into_iter()
268        .chain(tags.map_or(Ok(vec![]), |t| {
269            tags_filters(&dataframe, t, is_data_model_v1)
270        })?)
271        .try_fold(dataframe, |df, expr| {
272            df.filter(expr).context(DataFusionSnafu)
273        })?;
274
275    // Apply the distinct if needed.
276    let dataframe = if !distincts.is_empty() {
277        dataframe
278            .distinct_on(distincts.clone(), distincts, None)
279            .context(DataFusionSnafu)?
280    } else {
281        dataframe
282    };
283
284    // Apply the sorts if needed.
285    let dataframe = if !sorts.is_empty() {
286        dataframe.sort(sorts).context(DataFusionSnafu)?
287    } else {
288        dataframe
289    };
290
291    // Apply the limit if needed.
292    let dataframe = if let Some(limit) = limit {
293        dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
294    } else {
295        dataframe
296    };
297
298    // Execute the query and collect the result.
299    let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
300
301    let output = Output::new_with_stream(Box::pin(
302        RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
303    ));
304
305    Ok(output)
306}
307
308// The current implementation registers UDFs during the planning stage, which makes it difficult
309// to utilize them through DataFrame APIs. To address this limitation, we create a new session
310// context and register the required UDFs, allowing them to be decoupled from the global context.
311// TODO(zyy17): Is it possible or necessary to reuse the existing session context?
312fn create_df_context(
313    query_engine: &QueryEngineRef,
314    ctx: QueryContextRef,
315) -> ServerResult<SessionContext> {
316    let df_context = SessionContext::new_with_state(
317        SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
318    );
319
320    // The following JSON UDFs will be used for tags filters on v0 data model.
321    let udfs: Vec<FunctionRef> = vec![
322        Arc::new(JsonGetInt),
323        Arc::new(JsonGetFloat),
324        Arc::new(JsonGetBool),
325        Arc::new(JsonGetString),
326    ];
327
328    for udf in udfs {
329        df_context.register_udf(create_udf(
330            udf,
331            ctx.clone(),
332            Arc::new(FunctionState::default()),
333        ));
334    }
335
336    Ok(df_context)
337}
338
339fn json_tag_filters(
340    dataframe: &DataFrame,
341    tags: HashMap<String, JsonValue>,
342) -> ServerResult<Vec<Expr>> {
343    let mut filters = vec![];
344
345    // NOTE: The key of the tags may contain `.`, for example: `http.status_code`, so we need to use `["http.status_code"]` in json path to access the value.
346    for (key, value) in tags.iter() {
347        if let JsonValue::String(value) = value {
348            filters.push(
349                dataframe
350                    .registry()
351                    .udf(JsonGetString {}.name())
352                    .context(DataFusionSnafu)?
353                    .call(vec![
354                        col(SPAN_ATTRIBUTES_COLUMN),
355                        lit(format!("[\"{}\"]", key)),
356                    ])
357                    .eq(lit(value)),
358            );
359        }
360        if let JsonValue::Number(value) = value {
361            if value.is_i64() {
362                filters.push(
363                    dataframe
364                        .registry()
365                        .udf(JsonGetInt {}.name())
366                        .context(DataFusionSnafu)?
367                        .call(vec![
368                            col(SPAN_ATTRIBUTES_COLUMN),
369                            lit(format!("[\"{}\"]", key)),
370                        ])
371                        .eq(lit(value.as_i64().unwrap())),
372                );
373            }
374            if value.is_f64() {
375                filters.push(
376                    dataframe
377                        .registry()
378                        .udf(JsonGetFloat {}.name())
379                        .context(DataFusionSnafu)?
380                        .call(vec![
381                            col(SPAN_ATTRIBUTES_COLUMN),
382                            lit(format!("[\"{}\"]", key)),
383                        ])
384                        .eq(lit(value.as_f64().unwrap())),
385                );
386            }
387        }
388        if let JsonValue::Bool(value) = value {
389            filters.push(
390                dataframe
391                    .registry()
392                    .udf(JsonGetBool {}.name())
393                    .context(DataFusionSnafu)?
394                    .call(vec![
395                        col(SPAN_ATTRIBUTES_COLUMN),
396                        lit(format!("[\"{}\"]", key)),
397                    ])
398                    .eq(lit(*value)),
399            );
400        }
401    }
402
403    Ok(filters)
404}
405
406fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
407    let filters = tags
408        .into_iter()
409        .filter_map(|(key, value)| {
410            let key = format!("\"span_attributes.{}\"", key);
411            match value {
412                JsonValue::String(value) => Some(col(key).eq(lit(value))),
413                JsonValue::Number(value) => {
414                    if value.is_f64() {
415                        // safe to unwrap as checked previously
416                        Some(col(key).eq(lit(value.as_f64().unwrap())))
417                    } else {
418                        Some(col(key).eq(lit(value.as_i64().unwrap())))
419                    }
420                }
421                JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
422                JsonValue::Null => Some(col(key).is_null()),
423                // not supported at the moment
424                JsonValue::Array(_value) => None,
425                JsonValue::Object(_value) => None,
426            }
427        })
428        .collect();
429    Ok(filters)
430}
431
432fn tags_filters(
433    dataframe: &DataFrame,
434    tags: HashMap<String, JsonValue>,
435    is_data_model_v1: bool,
436) -> ServerResult<Vec<Expr>> {
437    if is_data_model_v1 {
438        flatten_tag_filters(tags)
439    } else {
440        json_tag_filters(dataframe, tags)
441    }
442}