1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use common_recordbatch::util;
30use datafusion::dataframe::DataFrame;
31use datafusion::execution::context::SessionContext;
32use datafusion::execution::SessionStateBuilder;
33use datafusion_expr::select_expr::SelectExpr;
34use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
35use datatypes::value::ValueRef;
36use query::QueryEngineRef;
37use serde_json::Value as JsonValue;
38use servers::error::{
39 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
40 TableNotFoundSnafu,
41};
42use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
43use servers::otlp::trace::{
44 DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
45 SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
46};
47use servers::query_handler::JaegerQueryHandler;
48use session::context::QueryContextRef;
49use snafu::{OptionExt, ResultExt};
50use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
51use table::table::adapter::DfTableProviderAdapter;
52
53use crate::instance::Instance;
54
55const DEFAULT_LIMIT: usize = 2000;
56
57#[async_trait]
58impl JaegerQueryHandler for Instance {
59 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
60 Ok(query_trace_table(
62 ctx,
63 self.catalog_manager(),
64 self.query_engine(),
65 vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
66 vec![],
67 vec![],
68 None,
69 None,
70 vec![col(SERVICE_NAME_COLUMN)],
71 )
72 .await?)
73 }
74
75 async fn get_operations(
76 &self,
77 ctx: QueryContextRef,
78 service_name: &str,
79 span_kind: Option<&str>,
80 start_time: Option<i64>,
81 end_time: Option<i64>,
82 ) -> ServerResult<Output> {
83 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
84
85 if let Some(span_kind) = span_kind {
86 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
87 "{}{}",
88 SPAN_KIND_PREFIX,
89 span_kind.to_uppercase()
90 ))));
91 }
92
93 if let Some(start_time) = start_time {
94 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
96 }
97
98 if let Some(end_time) = end_time {
99 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
101 }
102
103 Ok(query_trace_table(
118 ctx,
119 self.catalog_manager(),
120 self.query_engine(),
121 vec![
122 SelectExpr::from(col(SPAN_NAME_COLUMN)),
123 SelectExpr::from(col(SPAN_KIND_COLUMN)),
124 SelectExpr::from(col(SERVICE_NAME_COLUMN)),
125 SelectExpr::from(col(TIMESTAMP_COLUMN)),
126 ],
127 filters,
128 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
130 None,
131 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
132 )
133 .await?)
134 }
135
136 async fn get_trace(
137 &self,
138 ctx: QueryContextRef,
139 trace_id: &str,
140 start_time: Option<i64>,
141 end_time: Option<i64>,
142 ) -> ServerResult<Output> {
143 let selects = vec![wildcard()];
158
159 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
160
161 if let Some(start_time) = start_time {
162 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
163 }
164
165 if let Some(end_time) = end_time {
166 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
167 }
168
169 Ok(query_trace_table(
170 ctx,
171 self.catalog_manager(),
172 self.query_engine(),
173 selects,
174 filters,
175 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
177 None,
178 vec![],
179 )
180 .await?)
181 }
182
183 async fn find_traces(
184 &self,
185 ctx: QueryContextRef,
186 query_params: QueryTraceParams,
187 ) -> ServerResult<Output> {
188 let mut filters = vec![];
189
190 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
192
193 if let Some(operation_name) = query_params.operation_name {
194 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
195 }
196
197 if let Some(start_time) = query_params.start_time {
198 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
199 }
200
201 if let Some(end_time) = query_params.end_time {
202 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
203 }
204
205 if let Some(min_duration) = query_params.min_duration {
206 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
207 }
208
209 if let Some(max_duration) = query_params.max_duration {
210 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
211 }
212
213 let output = query_trace_table(
230 ctx.clone(),
231 self.catalog_manager(),
232 self.query_engine(),
233 vec![wildcard()],
234 filters,
235 vec![],
236 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
237 query_params.tags,
238 vec![col(TRACE_ID_COLUMN)],
239 )
240 .await?;
241
242 let mut filters = vec![col(TRACE_ID_COLUMN).in_list(
255 trace_ids_from_output(output)
256 .await?
257 .iter()
258 .map(lit)
259 .collect::<Vec<Expr>>(),
260 false,
261 )];
262
263 if let Some(start_time) = query_params.start_time {
264 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
265 }
266
267 if let Some(end_time) = query_params.end_time {
268 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
269 }
270
271 Ok(query_trace_table(
272 ctx,
273 self.catalog_manager(),
274 self.query_engine(),
275 vec![wildcard()],
276 filters,
277 vec![],
278 None,
279 None,
280 vec![],
281 )
282 .await?)
283 }
284}
285
286#[allow(clippy::too_many_arguments)]
287async fn query_trace_table(
288 ctx: QueryContextRef,
289 catalog_manager: &CatalogManagerRef,
290 query_engine: &QueryEngineRef,
291 selects: Vec<SelectExpr>,
292 filters: Vec<Expr>,
293 sorts: Vec<SortExpr>,
294 limit: Option<usize>,
295 tags: Option<HashMap<String, JsonValue>>,
296 distincts: Vec<Expr>,
297) -> ServerResult<Output> {
298 let trace_table_name = ctx
299 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
300 .unwrap_or(TRACE_TABLE_NAME);
301
302 let table_name = {
304 if match selects.as_slice() {
305 [SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
306 _ => false,
307 } {
308 &trace_services_table_name(trace_table_name)
309 } else {
310 trace_table_name
311 }
312 };
313
314 let table = catalog_manager
315 .table(
316 ctx.current_catalog(),
317 &ctx.current_schema(),
318 table_name,
319 Some(&ctx),
320 )
321 .await
322 .context(CatalogSnafu)?
323 .with_context(|| TableNotFoundSnafu {
324 table: table_name,
325 catalog: ctx.current_catalog(),
326 schema: ctx.current_schema(),
327 })?;
328
329 let is_data_model_v1 = table
330 .table_info()
331 .meta
332 .options
333 .extra_options
334 .get(TABLE_DATA_MODEL)
335 .map(|s| s.as_str())
336 == Some(TABLE_DATA_MODEL_TRACE_V1);
337
338 let df_context = create_df_context(query_engine, ctx.clone())?;
339
340 let dataframe = df_context
341 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
342 .context(DataFusionSnafu)?;
343
344 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
345
346 let dataframe = filters
348 .into_iter()
349 .chain(tags.map_or(Ok(vec![]), |t| {
350 tags_filters(&dataframe, t, is_data_model_v1)
351 })?)
352 .try_fold(dataframe, |df, expr| {
353 df.filter(expr).context(DataFusionSnafu)
354 })?;
355
356 let dataframe = if !distincts.is_empty() {
358 dataframe
359 .distinct_on(distincts.clone(), distincts, None)
360 .context(DataFusionSnafu)?
361 } else {
362 dataframe
363 };
364
365 let dataframe = if !sorts.is_empty() {
367 dataframe.sort(sorts).context(DataFusionSnafu)?
368 } else {
369 dataframe
370 };
371
372 let dataframe = if let Some(limit) = limit {
374 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
375 } else {
376 dataframe
377 };
378
379 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
381
382 let output = Output::new_with_stream(Box::pin(
383 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
384 ));
385
386 Ok(output)
387}
388
389fn create_df_context(
394 query_engine: &QueryEngineRef,
395 ctx: QueryContextRef,
396) -> ServerResult<SessionContext> {
397 let df_context = SessionContext::new_with_state(
398 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
399 );
400
401 let udfs: Vec<FunctionRef> = vec![
403 Arc::new(JsonGetInt),
404 Arc::new(JsonGetFloat),
405 Arc::new(JsonGetBool),
406 Arc::new(JsonGetString),
407 ];
408
409 for udf in udfs {
410 df_context.register_udf(create_udf(
411 udf,
412 ctx.clone(),
413 Arc::new(FunctionState::default()),
414 ));
415 }
416
417 Ok(df_context)
418}
419
420fn json_tag_filters(
421 dataframe: &DataFrame,
422 tags: HashMap<String, JsonValue>,
423) -> ServerResult<Vec<Expr>> {
424 let mut filters = vec![];
425
426 for (key, value) in tags.iter() {
428 if let JsonValue::String(value) = value {
429 filters.push(
430 dataframe
431 .registry()
432 .udf(JsonGetString {}.name())
433 .context(DataFusionSnafu)?
434 .call(vec![
435 col(SPAN_ATTRIBUTES_COLUMN),
436 lit(format!("[\"{}\"]", key)),
437 ])
438 .eq(lit(value)),
439 );
440 }
441 if let JsonValue::Number(value) = value {
442 if value.is_i64() {
443 filters.push(
444 dataframe
445 .registry()
446 .udf(JsonGetInt {}.name())
447 .context(DataFusionSnafu)?
448 .call(vec![
449 col(SPAN_ATTRIBUTES_COLUMN),
450 lit(format!("[\"{}\"]", key)),
451 ])
452 .eq(lit(value.as_i64().unwrap())),
453 );
454 }
455 if value.is_f64() {
456 filters.push(
457 dataframe
458 .registry()
459 .udf(JsonGetFloat {}.name())
460 .context(DataFusionSnafu)?
461 .call(vec![
462 col(SPAN_ATTRIBUTES_COLUMN),
463 lit(format!("[\"{}\"]", key)),
464 ])
465 .eq(lit(value.as_f64().unwrap())),
466 );
467 }
468 }
469 if let JsonValue::Bool(value) = value {
470 filters.push(
471 dataframe
472 .registry()
473 .udf(JsonGetBool {}.name())
474 .context(DataFusionSnafu)?
475 .call(vec![
476 col(SPAN_ATTRIBUTES_COLUMN),
477 lit(format!("[\"{}\"]", key)),
478 ])
479 .eq(lit(*value)),
480 );
481 }
482 }
483
484 Ok(filters)
485}
486
487fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
488 let filters = tags
489 .into_iter()
490 .filter_map(|(key, value)| {
491 let key = format!("\"span_attributes.{}\"", key);
492 match value {
493 JsonValue::String(value) => Some(col(key).eq(lit(value))),
494 JsonValue::Number(value) => {
495 if value.is_f64() {
496 Some(col(key).eq(lit(value.as_f64().unwrap())))
498 } else {
499 Some(col(key).eq(lit(value.as_i64().unwrap())))
500 }
501 }
502 JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
503 JsonValue::Null => Some(col(key).is_null()),
504 JsonValue::Array(_value) => None,
506 JsonValue::Object(_value) => None,
507 }
508 })
509 .collect();
510 Ok(filters)
511}
512
513fn tags_filters(
514 dataframe: &DataFrame,
515 tags: HashMap<String, JsonValue>,
516 is_data_model_v1: bool,
517) -> ServerResult<Vec<Expr>> {
518 if is_data_model_v1 {
519 flatten_tag_filters(tags)
520 } else {
521 json_tag_filters(dataframe, tags)
522 }
523}
524
525async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
527 if let OutputData::Stream(stream) = output.data {
528 let schema = stream.schema().clone();
529 let recordbatches = util::collect(stream)
530 .await
531 .context(CollectRecordbatchSnafu)?;
532
533 if !recordbatches.is_empty()
535 && schema.num_columns() == 1
536 && schema.contains_column(TRACE_ID_COLUMN)
537 {
538 let mut trace_ids = vec![];
539 for recordbatch in recordbatches {
540 for col in recordbatch.columns().iter() {
541 for row_idx in 0..recordbatch.num_rows() {
542 if let ValueRef::String(value) = col.get_ref(row_idx) {
543 trace_ids.push(value.to_string());
544 }
545 }
546 }
547 }
548
549 return Ok(trace_ids);
550 }
551 }
552
553 Ok(vec![])
554}