1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::{Output, OutputData};
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use common_recordbatch::util;
30use datafusion::dataframe::DataFrame;
31use datafusion::execution::context::SessionContext;
32use datafusion::execution::SessionStateBuilder;
33use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
34use datatypes::value::ValueRef;
35use query::QueryEngineRef;
36use serde_json::Value as JsonValue;
37use servers::error::{
38 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
39 TableNotFoundSnafu,
40};
41use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
42use servers::otlp::trace::{
43 DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
44 SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
45};
46use servers::query_handler::JaegerQueryHandler;
47use session::context::QueryContextRef;
48use snafu::{OptionExt, ResultExt};
49use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
50use table::table::adapter::DfTableProviderAdapter;
51
52use crate::instance::Instance;
53
54const DEFAULT_LIMIT: usize = 2000;
55
56#[async_trait]
57impl JaegerQueryHandler for Instance {
58 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
59 Ok(query_trace_table(
61 ctx,
62 self.catalog_manager(),
63 self.query_engine(),
64 vec![col(SERVICE_NAME_COLUMN)],
65 vec![],
66 vec![],
67 None,
68 None,
69 vec![col(SERVICE_NAME_COLUMN)],
70 )
71 .await?)
72 }
73
74 async fn get_operations(
75 &self,
76 ctx: QueryContextRef,
77 service_name: &str,
78 span_kind: Option<&str>,
79 start_time: Option<i64>,
80 end_time: Option<i64>,
81 ) -> ServerResult<Output> {
82 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
83
84 if let Some(span_kind) = span_kind {
85 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
86 "{}{}",
87 SPAN_KIND_PREFIX,
88 span_kind.to_uppercase()
89 ))));
90 }
91
92 if let Some(start_time) = start_time {
93 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
95 }
96
97 if let Some(end_time) = end_time {
98 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
100 }
101
102 Ok(query_trace_table(
117 ctx,
118 self.catalog_manager(),
119 self.query_engine(),
120 vec![
121 col(SPAN_NAME_COLUMN),
122 col(SPAN_KIND_COLUMN),
123 col(SERVICE_NAME_COLUMN),
124 col(TIMESTAMP_COLUMN),
125 ],
126 filters,
127 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
129 None,
130 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
131 )
132 .await?)
133 }
134
135 async fn get_trace(
136 &self,
137 ctx: QueryContextRef,
138 trace_id: &str,
139 start_time: Option<i64>,
140 end_time: Option<i64>,
141 ) -> ServerResult<Output> {
142 let selects = vec![wildcard()];
157
158 let mut filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
159
160 if let Some(start_time) = start_time {
161 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
162 }
163
164 if let Some(end_time) = end_time {
165 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
166 }
167
168 Ok(query_trace_table(
169 ctx,
170 self.catalog_manager(),
171 self.query_engine(),
172 selects,
173 filters,
174 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
176 None,
177 vec![],
178 )
179 .await?)
180 }
181
182 async fn find_traces(
183 &self,
184 ctx: QueryContextRef,
185 query_params: QueryTraceParams,
186 ) -> ServerResult<Output> {
187 let mut filters = vec![];
188
189 filters.push(col(SERVICE_NAME_COLUMN).eq(lit(query_params.service_name)));
191
192 if let Some(operation_name) = query_params.operation_name {
193 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
194 }
195
196 if let Some(start_time) = query_params.start_time {
197 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
198 }
199
200 if let Some(end_time) = query_params.end_time {
201 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
202 }
203
204 if let Some(min_duration) = query_params.min_duration {
205 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
206 }
207
208 if let Some(max_duration) = query_params.max_duration {
209 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
210 }
211
212 let output = query_trace_table(
229 ctx.clone(),
230 self.catalog_manager(),
231 self.query_engine(),
232 vec![wildcard()],
233 filters,
234 vec![],
235 Some(query_params.limit.unwrap_or(DEFAULT_LIMIT)),
236 query_params.tags,
237 vec![col(TRACE_ID_COLUMN)],
238 )
239 .await?;
240
241 let mut filters = vec![col(TRACE_ID_COLUMN).in_list(
254 trace_ids_from_output(output)
255 .await?
256 .iter()
257 .map(lit)
258 .collect::<Vec<Expr>>(),
259 false,
260 )];
261
262 if let Some(start_time) = query_params.start_time {
263 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
264 }
265
266 if let Some(end_time) = query_params.end_time {
267 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
268 }
269
270 Ok(query_trace_table(
271 ctx,
272 self.catalog_manager(),
273 self.query_engine(),
274 vec![wildcard()],
275 filters,
276 vec![],
277 None,
278 None,
279 vec![],
280 )
281 .await?)
282 }
283}
284
285#[allow(clippy::too_many_arguments)]
286async fn query_trace_table(
287 ctx: QueryContextRef,
288 catalog_manager: &CatalogManagerRef,
289 query_engine: &QueryEngineRef,
290 selects: Vec<Expr>,
291 filters: Vec<Expr>,
292 sorts: Vec<SortExpr>,
293 limit: Option<usize>,
294 tags: Option<HashMap<String, JsonValue>>,
295 distincts: Vec<Expr>,
296) -> ServerResult<Output> {
297 let trace_table_name = ctx
298 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
299 .unwrap_or(TRACE_TABLE_NAME);
300
301 let table_name = {
303 if selects.len() == 1 && selects[0] == col(SERVICE_NAME_COLUMN) {
304 &trace_services_table_name(trace_table_name)
305 } else {
306 trace_table_name
307 }
308 };
309
310 let table = catalog_manager
311 .table(
312 ctx.current_catalog(),
313 &ctx.current_schema(),
314 table_name,
315 Some(&ctx),
316 )
317 .await
318 .context(CatalogSnafu)?
319 .with_context(|| TableNotFoundSnafu {
320 table: table_name,
321 catalog: ctx.current_catalog(),
322 schema: ctx.current_schema(),
323 })?;
324
325 let is_data_model_v1 = table
326 .table_info()
327 .meta
328 .options
329 .extra_options
330 .get(TABLE_DATA_MODEL)
331 .map(|s| s.as_str())
332 == Some(TABLE_DATA_MODEL_TRACE_V1);
333
334 let df_context = create_df_context(query_engine, ctx.clone())?;
335
336 let dataframe = df_context
337 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
338 .context(DataFusionSnafu)?;
339
340 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
341
342 let dataframe = filters
344 .into_iter()
345 .chain(tags.map_or(Ok(vec![]), |t| {
346 tags_filters(&dataframe, t, is_data_model_v1)
347 })?)
348 .try_fold(dataframe, |df, expr| {
349 df.filter(expr).context(DataFusionSnafu)
350 })?;
351
352 let dataframe = if !distincts.is_empty() {
354 dataframe
355 .distinct_on(distincts.clone(), distincts, None)
356 .context(DataFusionSnafu)?
357 } else {
358 dataframe
359 };
360
361 let dataframe = if !sorts.is_empty() {
363 dataframe.sort(sorts).context(DataFusionSnafu)?
364 } else {
365 dataframe
366 };
367
368 let dataframe = if let Some(limit) = limit {
370 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
371 } else {
372 dataframe
373 };
374
375 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
377
378 let output = Output::new_with_stream(Box::pin(
379 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
380 ));
381
382 Ok(output)
383}
384
385fn create_df_context(
390 query_engine: &QueryEngineRef,
391 ctx: QueryContextRef,
392) -> ServerResult<SessionContext> {
393 let df_context = SessionContext::new_with_state(
394 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
395 );
396
397 let udfs: Vec<FunctionRef> = vec![
399 Arc::new(JsonGetInt),
400 Arc::new(JsonGetFloat),
401 Arc::new(JsonGetBool),
402 Arc::new(JsonGetString),
403 ];
404
405 for udf in udfs {
406 df_context.register_udf(create_udf(
407 udf,
408 ctx.clone(),
409 Arc::new(FunctionState::default()),
410 ));
411 }
412
413 Ok(df_context)
414}
415
416fn json_tag_filters(
417 dataframe: &DataFrame,
418 tags: HashMap<String, JsonValue>,
419) -> ServerResult<Vec<Expr>> {
420 let mut filters = vec![];
421
422 for (key, value) in tags.iter() {
424 if let JsonValue::String(value) = value {
425 filters.push(
426 dataframe
427 .registry()
428 .udf(JsonGetString {}.name())
429 .context(DataFusionSnafu)?
430 .call(vec![
431 col(SPAN_ATTRIBUTES_COLUMN),
432 lit(format!("[\"{}\"]", key)),
433 ])
434 .eq(lit(value)),
435 );
436 }
437 if let JsonValue::Number(value) = value {
438 if value.is_i64() {
439 filters.push(
440 dataframe
441 .registry()
442 .udf(JsonGetInt {}.name())
443 .context(DataFusionSnafu)?
444 .call(vec![
445 col(SPAN_ATTRIBUTES_COLUMN),
446 lit(format!("[\"{}\"]", key)),
447 ])
448 .eq(lit(value.as_i64().unwrap())),
449 );
450 }
451 if value.is_f64() {
452 filters.push(
453 dataframe
454 .registry()
455 .udf(JsonGetFloat {}.name())
456 .context(DataFusionSnafu)?
457 .call(vec![
458 col(SPAN_ATTRIBUTES_COLUMN),
459 lit(format!("[\"{}\"]", key)),
460 ])
461 .eq(lit(value.as_f64().unwrap())),
462 );
463 }
464 }
465 if let JsonValue::Bool(value) = value {
466 filters.push(
467 dataframe
468 .registry()
469 .udf(JsonGetBool {}.name())
470 .context(DataFusionSnafu)?
471 .call(vec![
472 col(SPAN_ATTRIBUTES_COLUMN),
473 lit(format!("[\"{}\"]", key)),
474 ])
475 .eq(lit(*value)),
476 );
477 }
478 }
479
480 Ok(filters)
481}
482
483fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
484 let filters = tags
485 .into_iter()
486 .filter_map(|(key, value)| {
487 let key = format!("\"span_attributes.{}\"", key);
488 match value {
489 JsonValue::String(value) => Some(col(key).eq(lit(value))),
490 JsonValue::Number(value) => {
491 if value.is_f64() {
492 Some(col(key).eq(lit(value.as_f64().unwrap())))
494 } else {
495 Some(col(key).eq(lit(value.as_i64().unwrap())))
496 }
497 }
498 JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
499 JsonValue::Null => Some(col(key).is_null()),
500 JsonValue::Array(_value) => None,
502 JsonValue::Object(_value) => None,
503 }
504 })
505 .collect();
506 Ok(filters)
507}
508
509fn tags_filters(
510 dataframe: &DataFrame,
511 tags: HashMap<String, JsonValue>,
512 is_data_model_v1: bool,
513) -> ServerResult<Vec<Expr>> {
514 if is_data_model_v1 {
515 flatten_tag_filters(tags)
516 } else {
517 json_tag_filters(dataframe, tags)
518 }
519}
520
521async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
523 if let OutputData::Stream(stream) = output.data {
524 let schema = stream.schema().clone();
525 let recordbatches = util::collect(stream)
526 .await
527 .context(CollectRecordbatchSnafu)?;
528
529 if !recordbatches.is_empty()
531 && schema.num_columns() == 1
532 && schema.contains_column(TRACE_ID_COLUMN)
533 {
534 let mut trace_ids = vec![];
535 for recordbatch in recordbatches {
536 for col in recordbatch.columns().iter() {
537 for row_idx in 0..recordbatch.num_rows() {
538 if let ValueRef::String(value) = col.get_ref(row_idx) {
539 trace_ids.push(value.to_string());
540 }
541 }
542 }
543 }
544
545 return Ok(trace_ids);
546 }
547 }
548
549 Ok(vec![])
550}