1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{TRACE_TABLE_NAME, trace_services_table_name};
21use common_function::function::FunctionRef;
22use common_function::scalars::json::json_get::{
23 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_query::{Output, OutputData};
27use common_recordbatch::adapter::RecordBatchStreamAdapter;
28use common_recordbatch::util;
29use datafusion::dataframe::DataFrame;
30use datafusion::execution::SessionStateBuilder;
31use datafusion::execution::context::SessionContext;
32use datafusion_expr::select_expr::SelectExpr;
33use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
34use datatypes::value::ValueRef;
35use query::QueryEngineRef;
36use serde_json::Value as JsonValue;
37use servers::error::{
38 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
39 TableNotFoundSnafu,
40};
41use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
42use servers::otlp::trace::{
43 DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
44 SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
45};
46use servers::query_handler::JaegerQueryHandler;
47use session::context::QueryContextRef;
48use snafu::{OptionExt, ResultExt};
49use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
50use table::table::adapter::DfTableProviderAdapter;
51
52use crate::instance::Instance;
53
54const DEFAULT_LIMIT: usize = 2000;
55
56#[async_trait]
57impl JaegerQueryHandler for Instance {
58 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
59 Ok(query_trace_table(
61 ctx,
62 self.catalog_manager(),
63 self.query_engine(),
64 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
65 vec![],
66 vec![],
67 None,
68 None,
69 vec![col(SERVICE_NAME_COLUMN)],
70 )
71 .await?)
72 }
73
74 async fn get_operations(
75 &self,
76 ctx: QueryContextRef,
77 service_name: &str,
78 span_kind: Option<&str>,
79 start_time: Option<i64>,
80 end_time: Option<i64>,
81 ) -> ServerResult<Output> {
82 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
83
84 if let Some(span_kind) = span_kind {
85 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
86 "{}{}",
87 SPAN_KIND_PREFIX,
88 span_kind.to_uppercase()
89 ))));
90 }
91
92 if let Some(start_time) = start_time {
93 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
95 }
96
97 if let Some(end_time) = end_time {
98 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
100 }
101
102 Ok(query_trace_table(
117 ctx,
118 self.catalog_manager(),
119 self.query_engine(),
120 vec![
121 SelectExpr::from(col(SPAN_NAME_COLUMN)),
122 SelectExpr::from(col(SPAN_KIND_COLUMN)),
123 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
124 SelectExpr::from(col(TIMESTAMP_COLUMN)),
125 ],
126 filters,
127 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
129 None,
130 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
131 )
132 .await?)
133 }
134
135 async fn get_trace(
136 &self,
137 ctx: QueryContextRef,
138 trace_id: &str,
139 start_time: Option<i64>,
140 end_time: Option<i64>,
141 ) -> ServerResult<Output> {
142 let selects = vec![wildcard()];
157
158 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
159
160 if let Some(start_time) = start_time {
161 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
162 }
163
164 if let Some(end_time) = end_time {
165 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
166 }
167
168 Ok(query_trace_table(
169 ctx,
170 self.catalog_manager(),
171 self.query_engine(),
172 selects,
173 filters,
174 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
176 None,
177 vec![],
178 )
179 .await?)
180 }
181
182 async fn find_traces(
183 &self,
184 ctx: QueryContextRef,
185 query_params: QueryTraceParams,
186 ) -> ServerResult<Output> {
187 let mut filters = vec![];
188
189 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
191
192 if let Some(operation_name) = query_params.operation_name {
193 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
194 }
195
196 if let Some(start_time) = query_params.start_time {
197 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
198 }
199
200 if let Some(end_time) = query_params.end_time {
201 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
202 }
203
204 if let Some(min_duration) = query_params.min_duration {
205 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
206 }
207
208 if let Some(max_duration) = query_params.max_duration {
209 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
210 }
211
212 let output = query_trace_table(
229 ctx.clone(),
230 self.catalog_manager(),
231 self.query_engine(),
232 vec![wildcard()],
233 filters,
234 vec![],
235 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
236 query_params.tags,
237 vec![col(TRACE_ID_COLUMN)],
238 )
239 .await?;
240
241 let mut filters = vec![
254 col(TRACE_ID_COLUMN).in_list(
255 trace_ids_from_output(output)
256 .await?
257 .iter()
258 .map(lit)
259 .collect::<Vec<Expr>>(),
260 false,
261 ),
262 ];
263
264 if let Some(start_time) = query_params.start_time {
265 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
266 }
267
268 if let Some(end_time) = query_params.end_time {
269 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
270 }
271
272 Ok(query_trace_table(
273 ctx,
274 self.catalog_manager(),
275 self.query_engine(),
276 vec![wildcard()],
277 filters,
278 vec![],
279 None,
280 None,
281 vec![],
282 )
283 .await?)
284 }
285}
286
287#[allow(clippy::too_many_arguments)]
288async fn query_trace_table(
289 ctx: QueryContextRef,
290 catalog_manager: &CatalogManagerRef,
291 query_engine: &QueryEngineRef,
292 selects: Vec<SelectExpr>,
293 filters: Vec<Expr>,
294 sorts: Vec<SortExpr>,
295 limit: Option<usize>,
296 tags: Option<HashMap<String, JsonValue>>,
297 distincts: Vec<Expr>,
298) -> ServerResult<Output> {
299 let trace_table_name = ctx
300 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
301 .unwrap_or(TRACE_TABLE_NAME);
302
303 let table_name = {
305 if match selects.as_slice() {
306 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
307 _ => false,
308 } {
309 &trace_services_table_name(trace_table_name)
310 } else {
311 trace_table_name
312 }
313 };
314
315 let table = catalog_manager
316 .table(
317 ctx.current_catalog(),
318 &ctx.current_schema(),
319 table_name,
320 Some(&ctx),
321 )
322 .await
323 .context(CatalogSnafu)?
324 .with_context(|| TableNotFoundSnafu {
325 table: table_name,
326 catalog: ctx.current_catalog(),
327 schema: ctx.current_schema(),
328 })?;
329
330 let is_data_model_v1 = table
331 .table_info()
332 .meta
333 .options
334 .extra_options
335 .get(TABLE_DATA_MODEL)
336 .map(|s| s.as_str())
337 == Some(TABLE_DATA_MODEL_TRACE_V1);
338
339 let df_context = create_df_context(query_engine)?;
340
341 let dataframe = df_context
342 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
343 .context(DataFusionSnafu)?;
344
345 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
346
347 let dataframe = filters
349 .into_iter()
350 .chain(tags.map_or(Ok(vec![]), |t| {
351 tags_filters(&dataframe, t, is_data_model_v1)
352 })?)
353 .try_fold(dataframe, |df, expr| {
354 df.filter(expr).context(DataFusionSnafu)
355 })?;
356
357 let dataframe = if !distincts.is_empty() {
359 dataframe
360 .distinct_on(distincts.clone(), distincts, None)
361 .context(DataFusionSnafu)?
362 } else {
363 dataframe
364 };
365
366 let dataframe = if !sorts.is_empty() {
368 dataframe.sort(sorts).context(DataFusionSnafu)?
369 } else {
370 dataframe
371 };
372
373 let dataframe = if let Some(limit) = limit {
375 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
376 } else {
377 dataframe
378 };
379
380 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
382
383 let output = Output::new_with_stream(Box::pin(
384 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
385 ));
386
387 Ok(output)
388}
389
390fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
395 let df_context = SessionContext::new_with_state(
396 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
397 );
398
399 let udfs: Vec<FunctionRef> = vec![
401 Arc::new(JsonGetInt::default()),
402 Arc::new(JsonGetFloat::default()),
403 Arc::new(JsonGetBool::default()),
404 Arc::new(JsonGetString::default()),
405 ];
406
407 for udf in udfs {
408 df_context.register_udf(create_udf(udf));
409 }
410
411 Ok(df_context)
412}
413
414fn json_tag_filters(
415 dataframe: &DataFrame,
416 tags: HashMap<String, JsonValue>,
417) -> ServerResult<Vec<Expr>> {
418 let mut filters = vec![];
419
420 for (key, value) in tags.iter() {
422 if let JsonValue::String(value) = value {
423 filters.push(
424 dataframe
425 .registry()
426 .udf(JsonGetString::NAME)
427 .context(DataFusionSnafu)?
428 .call(vec![
429 col(SPAN_ATTRIBUTES_COLUMN),
430 lit(format!("[\"{}\"]", key)),
431 ])
432 .eq(lit(value)),
433 );
434 }
435 if let JsonValue::Number(value) = value {
436 if value.is_i64() {
437 filters.push(
438 dataframe
439 .registry()
440 .udf(JsonGetInt::NAME)
441 .context(DataFusionSnafu)?
442 .call(vec![
443 col(SPAN_ATTRIBUTES_COLUMN),
444 lit(format!("[\"{}\"]", key)),
445 ])
446 .eq(lit(value.as_i64().unwrap())),
447 );
448 }
449 if value.is_f64() {
450 filters.push(
451 dataframe
452 .registry()
453 .udf(JsonGetFloat::NAME)
454 .context(DataFusionSnafu)?
455 .call(vec![
456 col(SPAN_ATTRIBUTES_COLUMN),
457 lit(format!("[\"{}\"]", key)),
458 ])
459 .eq(lit(value.as_f64().unwrap())),
460 );
461 }
462 }
463 if let JsonValue::Bool(value) = value {
464 filters.push(
465 dataframe
466 .registry()
467 .udf(JsonGetBool::NAME)
468 .context(DataFusionSnafu)?
469 .call(vec![
470 col(SPAN_ATTRIBUTES_COLUMN),
471 lit(format!("[\"{}\"]", key)),
472 ])
473 .eq(lit(*value)),
474 );
475 }
476 }
477
478 Ok(filters)
479}
480
481fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
482 let filters = tags
483 .into_iter()
484 .filter_map(|(key, value)| {
485 let key = format!("\"span_attributes.{}\"", key);
486 match value {
487 JsonValue::String(value) => Some(col(key).eq(lit(value))),
488 JsonValue::Number(value) => {
489 if value.is_f64() {
490 Some(col(key).eq(lit(value.as_f64().unwrap())))
492 } else {
493 Some(col(key).eq(lit(value.as_i64().unwrap())))
494 }
495 }
496 JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
497 JsonValue::Null => Some(col(key).is_null()),
498 JsonValue::Array(_value) => None,
500 JsonValue::Object(_value) => None,
501 }
502 })
503 .collect();
504 Ok(filters)
505}
506
507fn tags_filters(
508 dataframe: &DataFrame,
509 tags: HashMap<String, JsonValue>,
510 is_data_model_v1: bool,
511) -> ServerResult<Vec<Expr>> {
512 if is_data_model_v1 {
513 flatten_tag_filters(tags)
514 } else {
515 json_tag_filters(dataframe, tags)
516 }
517}
518
519async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
521 if let OutputData::Stream(stream) = output.data {
522 let schema = stream.schema().clone();
523 let recordbatches = util::collect(stream)
524 .await
525 .context(CollectRecordbatchSnafu)?;
526
527 if !recordbatches.is_empty()
529 && schema.num_columns() == 1
530 && schema.contains_column(TRACE_ID_COLUMN)
531 {
532 let mut trace_ids = vec![];
533 for recordbatch in recordbatches {
534 for col in recordbatch.columns().iter() {
535 for row_idx in 0..recordbatch.num_rows() {
536 if let ValueRef::String(value) = col.get_ref(row_idx) {
537 trace_ids.push(value.to_string());
538 }
539 }
540 }
541 }
542
543 return Ok(trace_ids);
544 }
545 }
546
547 Ok(vec![])
548}