1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{TRACE_TABLE_NAME, trace_services_table_name};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use common_recordbatch::util;
30use datafusion::dataframe::DataFrame;
31use datafusion::execution::SessionStateBuilder;
32use datafusion::execution::context::SessionContext;
33use datafusion_expr::select_expr::SelectExpr;
34use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
35use datatypes::value::ValueRef;
36use query::QueryEngineRef;
37use serde_json::Value as JsonValue;
38use servers::error::{
39 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
40 TableNotFoundSnafu,
41};
42use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
43use servers::otlp::trace::{
44 DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
45 SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
46};
47use servers::query_handler::JaegerQueryHandler;
48use session::context::QueryContextRef;
49use snafu::{OptionExt, ResultExt};
50use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
51use table::table::adapter::DfTableProviderAdapter;
52
53use crate::instance::Instance;
54
55const DEFAULT_LIMIT: usize = 2000;
56
57#[async_trait]
58impl JaegerQueryHandler for Instance {
59 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
60 Ok(query_trace_table(
62 ctx,
63 self.catalog_manager(),
64 self.query_engine(),
65 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
66 vec![],
67 vec![],
68 None,
69 None,
70 vec![col(SERVICE_NAME_COLUMN)],
71 )
72 .await?)
73 }
74
75 async fn get_operations(
76 &self,
77 ctx: QueryContextRef,
78 service_name: &str,
79 span_kind: Option<&str>,
80 start_time: Option<i64>,
81 end_time: Option<i64>,
82 ) -> ServerResult<Output> {
83 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
84
85 if let Some(span_kind) = span_kind {
86 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
87 "{}{}",
88 SPAN_KIND_PREFIX,
89 span_kind.to_uppercase()
90 ))));
91 }
92
93 if let Some(start_time) = start_time {
94 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
96 }
97
98 if let Some(end_time) = end_time {
99 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
101 }
102
103 Ok(query_trace_table(
118 ctx,
119 self.catalog_manager(),
120 self.query_engine(),
121 vec![
122 SelectExpr::from(col(SPAN_NAME_COLUMN)),
123 SelectExpr::from(col(SPAN_KIND_COLUMN)),
124 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
125 SelectExpr::from(col(TIMESTAMP_COLUMN)),
126 ],
127 filters,
128 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
130 None,
131 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
132 )
133 .await?)
134 }
135
136 async fn get_trace(
137 &self,
138 ctx: QueryContextRef,
139 trace_id: &str,
140 start_time: Option<i64>,
141 end_time: Option<i64>,
142 ) -> ServerResult<Output> {
143 let selects = vec![wildcard()];
158
159 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
160
161 if let Some(start_time) = start_time {
162 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
163 }
164
165 if let Some(end_time) = end_time {
166 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
167 }
168
169 Ok(query_trace_table(
170 ctx,
171 self.catalog_manager(),
172 self.query_engine(),
173 selects,
174 filters,
175 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
177 None,
178 vec![],
179 )
180 .await?)
181 }
182
183 async fn find_traces(
184 &self,
185 ctx: QueryContextRef,
186 query_params: QueryTraceParams,
187 ) -> ServerResult<Output> {
188 let mut filters = vec![];
189
190 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
192
193 if let Some(operation_name) = query_params.operation_name {
194 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
195 }
196
197 if let Some(start_time) = query_params.start_time {
198 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
199 }
200
201 if let Some(end_time) = query_params.end_time {
202 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
203 }
204
205 if let Some(min_duration) = query_params.min_duration {
206 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
207 }
208
209 if let Some(max_duration) = query_params.max_duration {
210 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
211 }
212
213 let output = query_trace_table(
230 ctx.clone(),
231 self.catalog_manager(),
232 self.query_engine(),
233 vec![wildcard()],
234 filters,
235 vec![],
236 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
237 query_params.tags,
238 vec![col(TRACE_ID_COLUMN)],
239 )
240 .await?;
241
242 let mut filters = vec![
255 col(TRACE_ID_COLUMN).in_list(
256 trace_ids_from_output(output)
257 .await?
258 .iter()
259 .map(lit)
260 .collect::<Vec<Expr>>(),
261 false,
262 ),
263 ];
264
265 if let Some(start_time) = query_params.start_time {
266 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
267 }
268
269 if let Some(end_time) = query_params.end_time {
270 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
271 }
272
273 Ok(query_trace_table(
274 ctx,
275 self.catalog_manager(),
276 self.query_engine(),
277 vec![wildcard()],
278 filters,
279 vec![],
280 None,
281 None,
282 vec![],
283 )
284 .await?)
285 }
286}
287
288#[allow(clippy::too_many_arguments)]
289async fn query_trace_table(
290 ctx: QueryContextRef,
291 catalog_manager: &CatalogManagerRef,
292 query_engine: &QueryEngineRef,
293 selects: Vec<SelectExpr>,
294 filters: Vec<Expr>,
295 sorts: Vec<SortExpr>,
296 limit: Option<usize>,
297 tags: Option<HashMap<String, JsonValue>>,
298 distincts: Vec<Expr>,
299) -> ServerResult<Output> {
300 let trace_table_name = ctx
301 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
302 .unwrap_or(TRACE_TABLE_NAME);
303
304 let table_name = {
306 if match selects.as_slice() {
307 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
308 _ => false,
309 } {
310 &trace_services_table_name(trace_table_name)
311 } else {
312 trace_table_name
313 }
314 };
315
316 let table = catalog_manager
317 .table(
318 ctx.current_catalog(),
319 &ctx.current_schema(),
320 table_name,
321 Some(&ctx),
322 )
323 .await
324 .context(CatalogSnafu)?
325 .with_context(|| TableNotFoundSnafu {
326 table: table_name,
327 catalog: ctx.current_catalog(),
328 schema: ctx.current_schema(),
329 })?;
330
331 let is_data_model_v1 = table
332 .table_info()
333 .meta
334 .options
335 .extra_options
336 .get(TABLE_DATA_MODEL)
337 .map(|s| s.as_str())
338 == Some(TABLE_DATA_MODEL_TRACE_V1);
339
340 let df_context = create_df_context(query_engine, ctx.clone())?;
341
342 let dataframe = df_context
343 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
344 .context(DataFusionSnafu)?;
345
346 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
347
348 let dataframe = filters
350 .into_iter()
351 .chain(tags.map_or(Ok(vec![]), |t| {
352 tags_filters(&dataframe, t, is_data_model_v1)
353 })?)
354 .try_fold(dataframe, |df, expr| {
355 df.filter(expr).context(DataFusionSnafu)
356 })?;
357
358 let dataframe = if !distincts.is_empty() {
360 dataframe
361 .distinct_on(distincts.clone(), distincts, None)
362 .context(DataFusionSnafu)?
363 } else {
364 dataframe
365 };
366
367 let dataframe = if !sorts.is_empty() {
369 dataframe.sort(sorts).context(DataFusionSnafu)?
370 } else {
371 dataframe
372 };
373
374 let dataframe = if let Some(limit) = limit {
376 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
377 } else {
378 dataframe
379 };
380
381 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
383
384 let output = Output::new_with_stream(Box::pin(
385 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
386 ));
387
388 Ok(output)
389}
390
391fn create_df_context(
396 query_engine: &QueryEngineRef,
397 ctx: QueryContextRef,
398) -> ServerResult<SessionContext> {
399 let df_context = SessionContext::new_with_state(
400 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
401 );
402
403 let udfs: Vec<FunctionRef> = vec![
405 Arc::new(JsonGetInt),
406 Arc::new(JsonGetFloat),
407 Arc::new(JsonGetBool),
408 Arc::new(JsonGetString),
409 ];
410
411 for udf in udfs {
412 df_context.register_udf(create_udf(
413 udf,
414 ctx.clone(),
415 Arc::new(FunctionState::default()),
416 ));
417 }
418
419 Ok(df_context)
420}
421
422fn json_tag_filters(
423 dataframe: &DataFrame,
424 tags: HashMap<String, JsonValue>,
425) -> ServerResult<Vec<Expr>> {
426 let mut filters = vec![];
427
428 for (key, value) in tags.iter() {
430 if let JsonValue::String(value) = value {
431 filters.push(
432 dataframe
433 .registry()
434 .udf(JsonGetString {}.name())
435 .context(DataFusionSnafu)?
436 .call(vec![
437 col(SPAN_ATTRIBUTES_COLUMN),
438 lit(format!("[\"{}\"]", key)),
439 ])
440 .eq(lit(value)),
441 );
442 }
443 if let JsonValue::Number(value) = value {
444 if value.is_i64() {
445 filters.push(
446 dataframe
447 .registry()
448 .udf(JsonGetInt {}.name())
449 .context(DataFusionSnafu)?
450 .call(vec![
451 col(SPAN_ATTRIBUTES_COLUMN),
452 lit(format!("[\"{}\"]", key)),
453 ])
454 .eq(lit(value.as_i64().unwrap())),
455 );
456 }
457 if value.is_f64() {
458 filters.push(
459 dataframe
460 .registry()
461 .udf(JsonGetFloat {}.name())
462 .context(DataFusionSnafu)?
463 .call(vec![
464 col(SPAN_ATTRIBUTES_COLUMN),
465 lit(format!("[\"{}\"]", key)),
466 ])
467 .eq(lit(value.as_f64().unwrap())),
468 );
469 }
470 }
471 if let JsonValue::Bool(value) = value {
472 filters.push(
473 dataframe
474 .registry()
475 .udf(JsonGetBool {}.name())
476 .context(DataFusionSnafu)?
477 .call(vec![
478 col(SPAN_ATTRIBUTES_COLUMN),
479 lit(format!("[\"{}\"]", key)),
480 ])
481 .eq(lit(*value)),
482 );
483 }
484 }
485
486 Ok(filters)
487}
488
489fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
490 let filters = tags
491 .into_iter()
492 .filter_map(|(key, value)| {
493 let key = format!("\"span_attributes.{}\"", key);
494 match value {
495 JsonValue::String(value) => Some(col(key).eq(lit(value))),
496 JsonValue::Number(value) => {
497 if value.is_f64() {
498 Some(col(key).eq(lit(value.as_f64().unwrap())))
500 } else {
501 Some(col(key).eq(lit(value.as_i64().unwrap())))
502 }
503 }
504 JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
505 JsonValue::Null => Some(col(key).is_null()),
506 JsonValue::Array(_value) => None,
508 JsonValue::Object(_value) => None,
509 }
510 })
511 .collect();
512 Ok(filters)
513}
514
515fn tags_filters(
516 dataframe: &DataFrame,
517 tags: HashMap<String, JsonValue>,
518 is_data_model_v1: bool,
519) -> ServerResult<Vec<Expr>> {
520 if is_data_model_v1 {
521 flatten_tag_filters(tags)
522 } else {
523 json_tag_filters(dataframe, tags)
524 }
525}
526
527async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
529 if let OutputData::Stream(stream) = output.data {
530 let schema = stream.schema().clone();
531 let recordbatches = util::collect(stream)
532 .await
533 .context(CollectRecordbatchSnafu)?;
534
535 if !recordbatches.is_empty()
537 && schema.num_columns() == 1
538 && schema.contains_column(TRACE_ID_COLUMN)
539 {
540 let mut trace_ids = vec![];
541 for recordbatch in recordbatches {
542 for col in recordbatch.columns().iter() {
543 for row_idx in 0..recordbatch.num_rows() {
544 if let ValueRef::String(value) = col.get_ref(row_idx) {
545 trace_ids.push(value.to_string());
546 }
547 }
548 }
549 }
550
551 return Ok(trace_ids);
552 }
553 }
554
555 Ok(vec![])
556}