1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{
21 TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
22};
23use common_function::function::FunctionRef;
24use common_function::scalars::json::json_get::{
25 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
26};
27use common_function::scalars::udf::create_udf;
28use common_query::{Output, OutputData};
29use common_recordbatch::adapter::RecordBatchStreamAdapter;
30use common_recordbatch::util;
31use datafusion::dataframe::DataFrame;
32use datafusion::execution::SessionStateBuilder;
33use datafusion::execution::context::SessionContext;
34use datafusion_expr::select_expr::SelectExpr;
35use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
36use datatypes::value::ValueRef;
37use query::QueryEngineRef;
38use serde_json::Value as JsonValue;
39use servers::error::{
40 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
41 TableNotFoundSnafu,
42};
43use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
44use servers::otlp::trace::{
45 DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
46 SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
47};
48use servers::query_handler::JaegerQueryHandler;
49use session::context::QueryContextRef;
50use snafu::{OptionExt, ResultExt};
51use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
52use table::table::adapter::DfTableProviderAdapter;
53
54use crate::instance::Instance;
55
56const DEFAULT_LIMIT: usize = 2000;
57
58#[async_trait]
59impl JaegerQueryHandler for Instance {
60 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
61 Ok(query_trace_table(
63 ctx,
64 self.catalog_manager(),
65 self.query_engine(),
66 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
67 vec![],
68 vec![],
69 None,
70 None,
71 vec![col(SERVICE_NAME_COLUMN)],
72 )
73 .await?)
74 }
75
76 async fn get_operations(
77 &self,
78 ctx: QueryContextRef,
79 service_name: &str,
80 span_kind: Option<&str>,
81 ) -> ServerResult<Output> {
82 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
83
84 if let Some(span_kind) = span_kind {
85 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
86 "{}{}",
87 SPAN_KIND_PREFIX,
88 span_kind.to_uppercase()
89 ))));
90 }
91
92 Ok(query_trace_table(
105 ctx,
106 self.catalog_manager(),
107 self.query_engine(),
108 vec![
109 SelectExpr::from(col(SPAN_NAME_COLUMN)),
110 SelectExpr::from(col(SPAN_KIND_COLUMN)),
111 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
112 SelectExpr::from(col(TIMESTAMP_COLUMN)),
113 ],
114 filters,
115 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
117 None,
118 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
119 )
120 .await?)
121 }
122
123 async fn get_trace(
124 &self,
125 ctx: QueryContextRef,
126 trace_id: &str,
127 start_time: Option<i64>,
128 end_time: Option<i64>,
129 ) -> ServerResult<Output> {
130 let selects = vec![wildcard()];
145
146 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
147
148 if let Some(start_time) = start_time {
149 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
150 }
151
152 if let Some(end_time) = end_time {
153 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
154 }
155
156 Ok(query_trace_table(
157 ctx,
158 self.catalog_manager(),
159 self.query_engine(),
160 selects,
161 filters,
162 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
164 None,
165 vec![],
166 )
167 .await?)
168 }
169
170 async fn find_traces(
171 &self,
172 ctx: QueryContextRef,
173 query_params: QueryTraceParams,
174 ) -> ServerResult<Output> {
175 let mut filters = vec![];
176
177 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
179
180 if let Some(operation_name) = query_params.operation_name {
181 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
182 }
183
184 if let Some(start_time) = query_params.start_time {
185 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
186 }
187
188 if let Some(end_time) = query_params.end_time {
189 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
190 }
191
192 if let Some(min_duration) = query_params.min_duration {
193 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
194 }
195
196 if let Some(max_duration) = query_params.max_duration {
197 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
198 }
199
200 let output = query_trace_table(
217 ctx.clone(),
218 self.catalog_manager(),
219 self.query_engine(),
220 vec![wildcard()],
221 filters,
222 vec![],
223 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
224 query_params.tags,
225 vec![col(TRACE_ID_COLUMN)],
226 )
227 .await?;
228
229 let mut filters = vec![
242 col(TRACE_ID_COLUMN).in_list(
243 trace_ids_from_output(output)
244 .await?
245 .iter()
246 .map(lit)
247 .collect::<Vec<Expr>>(),
248 false,
249 ),
250 ];
251
252 if let Some(start_time) = query_params.start_time {
253 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
254 }
255
256 if let Some(end_time) = query_params.end_time {
257 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
258 }
259
260 Ok(query_trace_table(
261 ctx,
262 self.catalog_manager(),
263 self.query_engine(),
264 vec![wildcard()],
265 filters,
266 vec![],
267 None,
268 None,
269 vec![],
270 )
271 .await?)
272 }
273}
274
275#[allow(clippy::too_many_arguments)]
276async fn query_trace_table(
277 ctx: QueryContextRef,
278 catalog_manager: &CatalogManagerRef,
279 query_engine: &QueryEngineRef,
280 selects: Vec<SelectExpr>,
281 filters: Vec<Expr>,
282 sorts: Vec<SortExpr>,
283 limit: Option<usize>,
284 tags: Option<HashMap<String, JsonValue>>,
285 distincts: Vec<Expr>,
286) -> ServerResult<Output> {
287 let trace_table_name = ctx
288 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
289 .unwrap_or(TRACE_TABLE_NAME);
290
291 let table_name = {
294 if match selects.as_slice() {
295 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
296 _ => false,
297 } {
298 &trace_services_table_name(trace_table_name)
299 } else if !distincts.is_empty()
300 && distincts.contains(&col(SPAN_NAME_COLUMN))
301 && distincts.contains(&col(SPAN_KIND_COLUMN))
302 {
303 &trace_operations_table_name(trace_table_name)
304 } else {
305 trace_table_name
306 }
307 };
308
309 let table = catalog_manager
310 .table(
311 ctx.current_catalog(),
312 &ctx.current_schema(),
313 table_name,
314 Some(&ctx),
315 )
316 .await
317 .context(CatalogSnafu)?
318 .with_context(|| TableNotFoundSnafu {
319 table: table_name,
320 catalog: ctx.current_catalog(),
321 schema: ctx.current_schema(),
322 })?;
323
324 let is_data_model_v1 = table
325 .table_info()
326 .meta
327 .options
328 .extra_options
329 .get(TABLE_DATA_MODEL)
330 .map(|s| s.as_str())
331 == Some(TABLE_DATA_MODEL_TRACE_V1);
332
333 let df_context = create_df_context(query_engine)?;
334
335 let dataframe = df_context
336 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
337 .context(DataFusionSnafu)?;
338
339 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
340
341 let dataframe = filters
343 .into_iter()
344 .chain(tags.map_or(Ok(vec![]), |t| {
345 tags_filters(&dataframe, t, is_data_model_v1)
346 })?)
347 .try_fold(dataframe, |df, expr| {
348 df.filter(expr).context(DataFusionSnafu)
349 })?;
350
351 let dataframe = if !distincts.is_empty() {
353 dataframe
354 .distinct_on(distincts.clone(), distincts, None)
355 .context(DataFusionSnafu)?
356 } else {
357 dataframe
358 };
359
360 let dataframe = if !sorts.is_empty() {
362 dataframe.sort(sorts).context(DataFusionSnafu)?
363 } else {
364 dataframe
365 };
366
367 let dataframe = if let Some(limit) = limit {
369 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
370 } else {
371 dataframe
372 };
373
374 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
376
377 let output = Output::new_with_stream(Box::pin(
378 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
379 ));
380
381 Ok(output)
382}
383
384fn create_df_context(query_engine: &QueryEngineRef) -> ServerResult<SessionContext> {
389 let df_context = SessionContext::new_with_state(
390 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
391 );
392
393 let udfs: Vec<FunctionRef> = vec![
395 Arc::new(JsonGetInt::default()),
396 Arc::new(JsonGetFloat::default()),
397 Arc::new(JsonGetBool::default()),
398 Arc::new(JsonGetString::default()),
399 ];
400
401 for udf in udfs {
402 df_context.register_udf(create_udf(udf));
403 }
404
405 Ok(df_context)
406}
407
408fn json_tag_filters(
409 dataframe: &DataFrame,
410 tags: HashMap<String, JsonValue>,
411) -> ServerResult<Vec<Expr>> {
412 let mut filters = vec![];
413
414 for (key, value) in tags.iter() {
416 if let JsonValue::String(value) = value {
417 filters.push(
418 dataframe
419 .registry()
420 .udf(JsonGetString::NAME)
421 .context(DataFusionSnafu)?
422 .call(vec![
423 col(SPAN_ATTRIBUTES_COLUMN),
424 lit(format!("[\"{}\"]", key)),
425 ])
426 .eq(lit(value)),
427 );
428 }
429 if let JsonValue::Number(value) = value {
430 if value.is_i64() {
431 filters.push(
432 dataframe
433 .registry()
434 .udf(JsonGetInt::NAME)
435 .context(DataFusionSnafu)?
436 .call(vec![
437 col(SPAN_ATTRIBUTES_COLUMN),
438 lit(format!("[\"{}\"]", key)),
439 ])
440 .eq(lit(value.as_i64().unwrap())),
441 );
442 }
443 if value.is_f64() {
444 filters.push(
445 dataframe
446 .registry()
447 .udf(JsonGetFloat::NAME)
448 .context(DataFusionSnafu)?
449 .call(vec![
450 col(SPAN_ATTRIBUTES_COLUMN),
451 lit(format!("[\"{}\"]", key)),
452 ])
453 .eq(lit(value.as_f64().unwrap())),
454 );
455 }
456 }
457 if let JsonValue::Bool(value) = value {
458 filters.push(
459 dataframe
460 .registry()
461 .udf(JsonGetBool::NAME)
462 .context(DataFusionSnafu)?
463 .call(vec![
464 col(SPAN_ATTRIBUTES_COLUMN),
465 lit(format!("[\"{}\"]", key)),
466 ])
467 .eq(lit(*value)),
468 );
469 }
470 }
471
472 Ok(filters)
473}
474
475fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
476 let filters = tags
477 .into_iter()
478 .filter_map(|(key, value)| {
479 let key = format!("\"span_attributes.{}\"", key);
480 match value {
481 JsonValue::String(value) => Some(col(key).eq(lit(value))),
482 JsonValue::Number(value) => {
483 if value.is_f64() {
484 Some(col(key).eq(lit(value.as_f64().unwrap())))
486 } else {
487 Some(col(key).eq(lit(value.as_i64().unwrap())))
488 }
489 }
490 JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
491 JsonValue::Null => Some(col(key).is_null()),
492 JsonValue::Array(_value) => None,
494 JsonValue::Object(_value) => None,
495 }
496 })
497 .collect();
498 Ok(filters)
499}
500
501fn tags_filters(
502 dataframe: &DataFrame,
503 tags: HashMap<String, JsonValue>,
504 is_data_model_v1: bool,
505) -> ServerResult<Vec<Expr>> {
506 if is_data_model_v1 {
507 flatten_tag_filters(tags)
508 } else {
509 json_tag_filters(dataframe, tags)
510 }
511}
512
513async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
515 if let OutputData::Stream(stream) = output.data {
516 let schema = stream.schema().clone();
517 let recordbatches = util::collect(stream)
518 .await
519 .context(CollectRecordbatchSnafu)?;
520
521 if !recordbatches.is_empty()
523 && schema.num_columns() == 1
524 && schema.contains_column(TRACE_ID_COLUMN)
525 {
526 let mut trace_ids = vec![];
527 for recordbatch in recordbatches {
528 for col in recordbatch.columns().iter() {
529 for row_idx in 0..recordbatch.num_rows() {
530 if let ValueRef::String(value) = col.get_ref(row_idx) {
531 trace_ids.push(value.to_string());
532 }
533 }
534 }
535 }
536
537 return Ok(trace_ids);
538 }
539 }
540
541 Ok(vec![])
542}