1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use catalog::CatalogManagerRef;
20use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
21use common_function::function::{Function, FunctionRef};
22use common_function::scalars::json::json_get::{
23 JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
24};
25use common_function::scalars::udf::create_udf;
26use common_function::state::FunctionState;
27use common_query::Output;
28use common_recordbatch::adapter::RecordBatchStreamAdapter;
29use datafusion::dataframe::DataFrame;
30use datafusion::execution::context::SessionContext;
31use datafusion::execution::SessionStateBuilder;
32use datafusion_expr::{col, lit, lit_timestamp_nano, wildcard, Expr, SortExpr};
33use query::QueryEngineRef;
34use serde_json::Value as JsonValue;
35use servers::error::{
36 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
37 TableNotFoundSnafu,
38};
39use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
40use servers::otlp::trace::{
41 DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
42 SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
43};
44use servers::query_handler::JaegerQueryHandler;
45use session::context::QueryContextRef;
46use snafu::{OptionExt, ResultExt};
47use table::requests::{TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1};
48use table::table::adapter::DfTableProviderAdapter;
49
50use crate::instance::Instance;
51
52const DEFAULT_LIMIT: usize = 2000;
53
54#[async_trait]
55impl JaegerQueryHandler for Instance {
56 async fn get_services(&self, ctx: QueryContextRef) -> ServerResult<Output> {
57 Ok(query_trace_table(
59 ctx,
60 self.catalog_manager(),
61 self.query_engine(),
62 vec![col(SERVICE_NAME_COLUMN)],
63 vec![],
64 vec![],
65 None,
66 None,
67 vec![col(SERVICE_NAME_COLUMN)],
68 )
69 .await?)
70 }
71
72 async fn get_operations(
73 &self,
74 ctx: QueryContextRef,
75 service_name: &str,
76 span_kind: Option<&str>,
77 start_time: Option<i64>,
78 end_time: Option<i64>,
79 ) -> ServerResult<Output> {
80 let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
81
82 if let Some(span_kind) = span_kind {
83 filters.push(col(SPAN_KIND_COLUMN).eq(lit(format!(
84 "{}{}",
85 SPAN_KIND_PREFIX,
86 span_kind.to_uppercase()
87 ))));
88 }
89
90 if let Some(start_time) = start_time {
91 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
93 }
94
95 if let Some(end_time) = end_time {
96 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
98 }
99
100 Ok(query_trace_table(
115 ctx,
116 self.catalog_manager(),
117 self.query_engine(),
118 vec![
119 col(SPAN_NAME_COLUMN),
120 col(SPAN_KIND_COLUMN),
121 col(SERVICE_NAME_COLUMN),
122 col(TIMESTAMP_COLUMN),
123 ],
124 filters,
125 vec![col(SPAN_NAME_COLUMN).sort(true, false)], Some(DEFAULT_LIMIT),
127 None,
128 vec![col(SPAN_NAME_COLUMN), col(SPAN_KIND_COLUMN)],
129 )
130 .await?)
131 }
132
133 async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> ServerResult<Output> {
134 let selects = vec![wildcard()];
147
148 let filters = vec![col(TRACE_ID_COLUMN).eq(lit(trace_id))];
149
150 Ok(query_trace_table(
151 ctx,
152 self.catalog_manager(),
153 self.query_engine(),
154 selects,
155 filters,
156 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
158 None,
159 vec![],
160 )
161 .await?)
162 }
163
164 async fn find_traces(
165 &self,
166 ctx: QueryContextRef,
167 query_params: QueryTraceParams,
168 ) -> ServerResult<Output> {
169 let selects = vec![wildcard()];
170
171 let mut filters = vec![];
172
173 if let Some(operation_name) = query_params.operation_name {
174 filters.push(col(SPAN_NAME_COLUMN).eq(lit(operation_name)));
175 }
176
177 if let Some(start_time) = query_params.start_time {
178 filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time)));
179 }
180
181 if let Some(end_time) = query_params.end_time {
182 filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time)));
183 }
184
185 if let Some(min_duration) = query_params.min_duration {
186 filters.push(col(DURATION_NANO_COLUMN).gt_eq(lit(min_duration)));
187 }
188
189 if let Some(max_duration) = query_params.max_duration {
190 filters.push(col(DURATION_NANO_COLUMN).lt_eq(lit(max_duration)));
191 }
192
193 Ok(query_trace_table(
194 ctx,
195 self.catalog_manager(),
196 self.query_engine(),
197 selects,
198 filters,
199 vec![col(TIMESTAMP_COLUMN).sort(false, false)], Some(DEFAULT_LIMIT),
201 query_params.tags,
202 vec![],
203 )
204 .await?)
205 }
206}
207
208#[allow(clippy::too_many_arguments)]
209async fn query_trace_table(
210 ctx: QueryContextRef,
211 catalog_manager: &CatalogManagerRef,
212 query_engine: &QueryEngineRef,
213 selects: Vec<Expr>,
214 filters: Vec<Expr>,
215 sorts: Vec<SortExpr>,
216 limit: Option<usize>,
217 tags: Option<HashMap<String, JsonValue>>,
218 distincts: Vec<Expr>,
219) -> ServerResult<Output> {
220 let trace_table_name = ctx
221 .extension(JAEGER_QUERY_TABLE_NAME_KEY)
222 .unwrap_or(TRACE_TABLE_NAME);
223
224 let table_name = {
226 if selects.len() == 1 && selects[0] == col(SERVICE_NAME_COLUMN) {
227 &trace_services_table_name(trace_table_name)
228 } else {
229 trace_table_name
230 }
231 };
232
233 let table = catalog_manager
234 .table(
235 ctx.current_catalog(),
236 &ctx.current_schema(),
237 table_name,
238 Some(&ctx),
239 )
240 .await
241 .context(CatalogSnafu)?
242 .with_context(|| TableNotFoundSnafu {
243 table: table_name,
244 catalog: ctx.current_catalog(),
245 schema: ctx.current_schema(),
246 })?;
247
248 let is_data_model_v1 = table
249 .table_info()
250 .meta
251 .options
252 .extra_options
253 .get(TABLE_DATA_MODEL)
254 .map(|s| s.as_str())
255 == Some(TABLE_DATA_MODEL_TRACE_V1);
256
257 let df_context = create_df_context(query_engine, ctx.clone())?;
258
259 let dataframe = df_context
260 .read_table(Arc::new(DfTableProviderAdapter::new(table)))
261 .context(DataFusionSnafu)?;
262
263 let dataframe = dataframe.select(selects).context(DataFusionSnafu)?;
264
265 let dataframe = filters
267 .into_iter()
268 .chain(tags.map_or(Ok(vec![]), |t| {
269 tags_filters(&dataframe, t, is_data_model_v1)
270 })?)
271 .try_fold(dataframe, |df, expr| {
272 df.filter(expr).context(DataFusionSnafu)
273 })?;
274
275 let dataframe = if !distincts.is_empty() {
277 dataframe
278 .distinct_on(distincts.clone(), distincts, None)
279 .context(DataFusionSnafu)?
280 } else {
281 dataframe
282 };
283
284 let dataframe = if !sorts.is_empty() {
286 dataframe.sort(sorts).context(DataFusionSnafu)?
287 } else {
288 dataframe
289 };
290
291 let dataframe = if let Some(limit) = limit {
293 dataframe.limit(0, Some(limit)).context(DataFusionSnafu)?
294 } else {
295 dataframe
296 };
297
298 let stream = dataframe.execute_stream().await.context(DataFusionSnafu)?;
300
301 let output = Output::new_with_stream(Box::pin(
302 RecordBatchStreamAdapter::try_new(stream).context(CollectRecordbatchSnafu)?,
303 ));
304
305 Ok(output)
306}
307
308fn create_df_context(
313 query_engine: &QueryEngineRef,
314 ctx: QueryContextRef,
315) -> ServerResult<SessionContext> {
316 let df_context = SessionContext::new_with_state(
317 SessionStateBuilder::new_from_existing(query_engine.engine_state().session_state()).build(),
318 );
319
320 let udfs: Vec<FunctionRef> = vec![
322 Arc::new(JsonGetInt),
323 Arc::new(JsonGetFloat),
324 Arc::new(JsonGetBool),
325 Arc::new(JsonGetString),
326 ];
327
328 for udf in udfs {
329 df_context.register_udf(create_udf(
330 udf,
331 ctx.clone(),
332 Arc::new(FunctionState::default()),
333 ));
334 }
335
336 Ok(df_context)
337}
338
339fn json_tag_filters(
340 dataframe: &DataFrame,
341 tags: HashMap<String, JsonValue>,
342) -> ServerResult<Vec<Expr>> {
343 let mut filters = vec![];
344
345 for (key, value) in tags.iter() {
347 if let JsonValue::String(value) = value {
348 filters.push(
349 dataframe
350 .registry()
351 .udf(JsonGetString {}.name())
352 .context(DataFusionSnafu)?
353 .call(vec![
354 col(SPAN_ATTRIBUTES_COLUMN),
355 lit(format!("[\"{}\"]", key)),
356 ])
357 .eq(lit(value)),
358 );
359 }
360 if let JsonValue::Number(value) = value {
361 if value.is_i64() {
362 filters.push(
363 dataframe
364 .registry()
365 .udf(JsonGetInt {}.name())
366 .context(DataFusionSnafu)?
367 .call(vec![
368 col(SPAN_ATTRIBUTES_COLUMN),
369 lit(format!("[\"{}\"]", key)),
370 ])
371 .eq(lit(value.as_i64().unwrap())),
372 );
373 }
374 if value.is_f64() {
375 filters.push(
376 dataframe
377 .registry()
378 .udf(JsonGetFloat {}.name())
379 .context(DataFusionSnafu)?
380 .call(vec![
381 col(SPAN_ATTRIBUTES_COLUMN),
382 lit(format!("[\"{}\"]", key)),
383 ])
384 .eq(lit(value.as_f64().unwrap())),
385 );
386 }
387 }
388 if let JsonValue::Bool(value) = value {
389 filters.push(
390 dataframe
391 .registry()
392 .udf(JsonGetBool {}.name())
393 .context(DataFusionSnafu)?
394 .call(vec![
395 col(SPAN_ATTRIBUTES_COLUMN),
396 lit(format!("[\"{}\"]", key)),
397 ])
398 .eq(lit(*value)),
399 );
400 }
401 }
402
403 Ok(filters)
404}
405
406fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
407 let filters = tags
408 .into_iter()
409 .filter_map(|(key, value)| {
410 let key = format!("\"span_attributes.{}\"", key);
411 match value {
412 JsonValue::String(value) => Some(col(key).eq(lit(value))),
413 JsonValue::Number(value) => {
414 if value.is_f64() {
415 Some(col(key).eq(lit(value.as_f64().unwrap())))
417 } else {
418 Some(col(key).eq(lit(value.as_i64().unwrap())))
419 }
420 }
421 JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
422 JsonValue::Null => Some(col(key).is_null()),
423 JsonValue::Array(_value) => None,
425 JsonValue::Object(_value) => None,
426 }
427 })
428 .collect();
429 Ok(filters)
430}
431
432fn tags_filters(
433 dataframe: &DataFrame,
434 tags: HashMap<String, JsonValue>,
435 is_data_model_v1: bool,
436) -> ServerResult<Vec<Expr>> {
437 if is_data_model_v1 {
438 flatten_tag_filters(tags)
439 } else {
440 json_tag_filters(dataframe, tags)
441 }
442}