1use arrow_schema::{DataType, Schema as ArrowSchema};
16use catalog::table_source::DfTableSourceProvider;
17use common_function::utils::escape_like_pattern;
18use datafusion::datasource::DefaultTableSource;
19use datafusion::execution::SessionState;
20use datafusion_common::{DFSchema, ScalarValue};
21use datafusion_expr::utils::{conjunction, disjunction};
22use datafusion_expr::{
23 BinaryExpr, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, Operator, col, lit, not,
24};
25use datafusion_sql::TableReference;
26use datatypes::schema::Schema;
27use log_query::{AggFunc, BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
28use snafu::{OptionExt, ResultExt};
29use table::table::adapter::DfTableProviderAdapter;
30
31use crate::log_query::error::{
32 CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
33 UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
34 UnknownTableSnafu,
35};
36
37const DEFAULT_LIMIT: usize = 1000;
38
39pub struct LogQueryPlanner {
40 table_provider: DfTableSourceProvider,
41 session_state: SessionState,
42}
43
44impl LogQueryPlanner {
45 pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
46 Self {
47 table_provider,
48 session_state,
49 }
50 }
51
52 pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
53 let table_ref: TableReference = query.table.table_ref().into();
55 let table_source = self
56 .table_provider
57 .resolve_table(table_ref.clone())
58 .await
59 .context(CatalogSnafu)?;
60 let schema = table_source
61 .as_any()
62 .downcast_ref::<DefaultTableSource>()
63 .context(UnknownTableSnafu)?
64 .table_provider
65 .as_any()
66 .downcast_ref::<DfTableProviderAdapter>()
67 .context(UnknownTableSnafu)?
68 .table()
69 .schema();
70
71 let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
73 .context(DataFusionPlanningSnafu)?;
74 let df_schema = plan_builder.schema().clone();
75
76 let mut filters = Vec::new();
78
79 filters.push(self.build_time_filter(&query.time_filter, &schema)?);
81
82 if let Some(filters_expr) = self.build_filters(&query.filters, df_schema.as_arrow())? {
83 filters.push(filters_expr);
84 }
85
86 if !filters.is_empty() {
88 let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap();
89 plan_builder = plan_builder
90 .filter(filter_expr)
91 .context(DataFusionPlanningSnafu)?;
92 }
93
94 if !query.columns.is_empty() {
96 let projected_columns = query.columns.iter().map(col).collect::<Vec<_>>();
97 plan_builder = plan_builder
98 .project(projected_columns)
99 .context(DataFusionPlanningSnafu)?;
100 }
101
102 plan_builder = plan_builder
104 .limit(
105 query.limit.skip.unwrap_or(0),
106 Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
107 )
108 .context(DataFusionPlanningSnafu)?;
109
110 for expr in &query.exprs {
112 plan_builder = self.process_log_expr(plan_builder, expr)?;
113 }
114
115 let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
117
118 Ok(plan)
119 }
120
121 fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result<Expr> {
122 let timestamp_col = schema
123 .timestamp_column()
124 .with_context(|| TimeIndexNotFoundSnafu {})?
125 .name
126 .clone();
127
128 let start_time = ScalarValue::Utf8(time_filter.start.clone());
129 let end_time = ScalarValue::Utf8(
130 time_filter
131 .end
132 .clone()
133 .or(Some("9999-12-31T23:59:59Z".to_string())),
134 );
135 let expr = col(timestamp_col.clone())
136 .gt_eq(lit(start_time))
137 .and(col(timestamp_col).lt_eq(lit(end_time)));
138
139 Ok(expr)
140 }
141 fn build_filters(
143 &self,
144 filters: &log_query::Filters,
145 schema: &ArrowSchema,
146 ) -> Result<Option<Expr>> {
147 match filters {
148 log_query::Filters::And(filters) => {
149 let exprs = filters
150 .iter()
151 .filter_map(|filter| self.build_filters(filter, schema).transpose())
152 .try_collect::<Vec<_>>()?;
153 if exprs.is_empty() {
154 Ok(None)
155 } else {
156 Ok(conjunction(exprs))
157 }
158 }
159 log_query::Filters::Or(filters) => {
160 let exprs = filters
161 .iter()
162 .filter_map(|filter| self.build_filters(filter, schema).transpose())
163 .try_collect::<Vec<_>>()?;
164 if exprs.is_empty() {
165 Ok(None)
166 } else {
167 Ok(disjunction(exprs))
168 }
169 }
170 log_query::Filters::Not(filter) => {
171 if let Some(expr) = self.build_filters(filter, schema)? {
172 Ok(Some(not(expr)))
173 } else {
174 Ok(None)
175 }
176 }
177 log_query::Filters::Single(column_filters) => {
178 self.build_column_filter(column_filters, schema)
180 }
181 }
182 }
183
184 fn build_column_filter(
186 &self,
187 column_filter: &log_query::ColumnFilters,
188 schema: &ArrowSchema,
189 ) -> Result<Option<Expr>> {
190 let df_schema = DFSchema::try_from(schema.clone()).context(DataFusionPlanningSnafu)?;
192 let col_expr = self.log_expr_to_df_expr(&column_filter.expr, &df_schema)?;
193
194 let filter_exprs = column_filter
195 .filters
196 .iter()
197 .filter_map(|filter| {
198 self.build_content_filter_with_expr(col_expr.clone(), filter, &df_schema)
199 .transpose()
200 })
201 .try_collect::<Vec<_>>()?;
202
203 if filter_exprs.is_empty() {
204 return Ok(Some(col_expr.is_true()));
205 }
206
207 Ok(conjunction(filter_exprs))
209 }
210
211 #[allow(clippy::only_used_in_recursion)]
213 fn build_content_filter_with_expr(
214 &self,
215 col_expr: Expr,
216 filter: &log_query::ContentFilter,
217 schema: &DFSchema,
218 ) -> Result<Option<Expr>> {
219 match filter {
220 log_query::ContentFilter::Exact(value) => Ok(Some(
221 col_expr.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))),
222 )),
223 log_query::ContentFilter::Prefix(value) => Ok(Some(col_expr.like(lit(
224 ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(value)))),
225 )))),
226 log_query::ContentFilter::Postfix(value) => Ok(Some(col_expr.like(lit(
227 ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(value)))),
228 )))),
229 log_query::ContentFilter::Contains(value) => Ok(Some(col_expr.like(lit(
230 ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(value)))),
231 )))),
232 log_query::ContentFilter::Regex(_pattern) => Err(UnimplementedSnafu {
233 feature: "regex filter",
234 }
235 .build()),
236 log_query::ContentFilter::Exist => Ok(Some(col_expr.is_not_null())),
237 log_query::ContentFilter::Between {
238 start,
239 end,
240 start_inclusive,
241 end_inclusive,
242 } => {
243 let start_literal = self.create_inferred_literal(start, &col_expr, schema);
244 let end_literal = self.create_inferred_literal(end, &col_expr, schema);
245
246 let left = if *start_inclusive {
247 col_expr.clone().gt_eq(start_literal)
248 } else {
249 col_expr.clone().gt(start_literal)
250 };
251 let right = if *end_inclusive {
252 col_expr.lt_eq(end_literal)
253 } else {
254 col_expr.lt(end_literal)
255 };
256 Ok(Some(left.and(right)))
257 }
258 log_query::ContentFilter::GreatThan { value, inclusive } => {
259 let value_literal = self.create_inferred_literal(value, &col_expr, schema);
260 let comparison_expr = if *inclusive {
261 col_expr.gt_eq(value_literal)
262 } else {
263 col_expr.gt(value_literal)
264 };
265 Ok(Some(comparison_expr))
266 }
267 log_query::ContentFilter::LessThan { value, inclusive } => {
268 let value_literal = self.create_inferred_literal(value, &col_expr, schema);
269 if *inclusive {
270 Ok(Some(col_expr.lt_eq(value_literal)))
271 } else {
272 Ok(Some(col_expr.lt(value_literal)))
273 }
274 }
275 log_query::ContentFilter::In(values) => {
276 let inferred_values: Vec<_> = values
277 .iter()
278 .map(|v| self.create_inferred_literal(v, &col_expr, schema))
279 .collect();
280 Ok(Some(col_expr.in_list(inferred_values, false)))
281 }
282 log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())),
283 log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())),
284 log_query::ContentFilter::Equal(value) => {
285 let value_literal = Self::create_eq_literal(value.clone());
286 Ok(Some(col_expr.eq(value_literal)))
287 }
288 log_query::ContentFilter::Compound(filters, op) => {
289 let exprs = filters
290 .iter()
291 .filter_map(|filter| {
292 self.build_content_filter_with_expr(col_expr.clone(), filter, schema)
293 .transpose()
294 })
295 .try_collect::<Vec<_>>()?;
296
297 if exprs.is_empty() {
298 return Ok(None);
299 }
300
301 match op {
302 log_query::ConjunctionOperator::And => Ok(conjunction(exprs)),
303 log_query::ConjunctionOperator::Or => {
304 Ok(exprs.into_iter().reduce(|a, b| a.or(b)))
306 }
307 }
308 }
309 }
310 }
311
312 fn build_aggr_func(
313 &self,
314 schema: &DFSchema,
315 expr: &[AggFunc],
316 by: &[LogExpr],
317 ) -> Result<(Vec<Expr>, Vec<Expr>)> {
318 let aggr_expr = expr
319 .iter()
320 .map(|agg_func| {
321 let AggFunc {
322 name: fn_name,
323 args,
324 alias,
325 } = agg_func;
326 let aggr_fn = self
327 .session_state
328 .aggregate_functions()
329 .get(fn_name)
330 .context(UnknownAggregateFunctionSnafu {
331 name: fn_name.to_string(),
332 })?;
333 let args = args
334 .iter()
335 .map(|expr| self.log_expr_to_df_expr(expr, schema))
336 .try_collect::<Vec<_>>()?;
337 if let Some(alias) = alias {
338 Ok(aggr_fn.call(args).alias(alias))
339 } else {
340 Ok(aggr_fn.call(args))
341 }
342 })
343 .try_collect::<Vec<_>>()?;
344
345 let group_exprs = by
346 .iter()
347 .map(|expr| self.log_expr_to_df_expr(expr, schema))
348 .try_collect::<Vec<_>>()?;
349
350 Ok((aggr_expr, group_exprs))
351 }
352
353 fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
355 match expr {
356 LogExpr::NamedIdent(name) => Ok(col(name)),
357 LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
358 LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
359 LogExpr::BinaryOp { left, op, right } => {
360 self.build_binary_expr(left, op, right, schema)
362 }
363 LogExpr::ScalarFunc { name, args, alias } => {
364 self.build_scalar_func(schema, name, args, alias)
365 }
366 LogExpr::Alias { expr, alias } => {
367 let df_expr = self.log_expr_to_df_expr(expr, schema)?;
368 Ok(df_expr.alias(alias))
369 }
370 LogExpr::AggrFunc { .. } | LogExpr::Filter { .. } | LogExpr::Decompose { .. } => {
371 UnexpectedLogExprSnafu {
372 expr: expr.clone(),
373 expected: "not a typical expression",
374 }
375 .fail()
376 }
377 }
378 }
379
380 fn build_scalar_func(
381 &self,
382 schema: &DFSchema,
383 name: &str,
384 args: &[LogExpr],
385 alias: &Option<String>,
386 ) -> Result<Expr> {
387 let args = args
388 .iter()
389 .map(|expr| self.log_expr_to_df_expr(expr, schema))
390 .try_collect::<Vec<_>>()?;
391 let func = self.session_state.scalar_functions().get(name).context(
392 UnknownScalarFunctionSnafu {
393 name: name.to_string(),
394 },
395 )?;
396 let expr = func.call(args);
397
398 if let Some(alias) = alias {
399 Ok(expr.alias(alias))
400 } else {
401 Ok(expr)
402 }
403 }
404
405 fn binary_operator_to_df_operator(op: &BinaryOperator) -> Operator {
407 match op {
408 BinaryOperator::Eq => Operator::Eq,
409 BinaryOperator::Ne => Operator::NotEq,
410 BinaryOperator::Lt => Operator::Lt,
411 BinaryOperator::Le => Operator::LtEq,
412 BinaryOperator::Gt => Operator::Gt,
413 BinaryOperator::Ge => Operator::GtEq,
414 BinaryOperator::Plus => Operator::Plus,
415 BinaryOperator::Minus => Operator::Minus,
416 BinaryOperator::Multiply => Operator::Multiply,
417 BinaryOperator::Divide => Operator::Divide,
418 BinaryOperator::Modulo => Operator::Modulo,
419 BinaryOperator::And => Operator::And,
420 BinaryOperator::Or => Operator::Or,
421 }
422 }
423
424 fn infer_literal_scalar_value(&self, literal: &str, target_type: &DataType) -> ScalarValue {
427 let utf8_literal = ScalarValue::Utf8(Some(literal.to_string()));
428 utf8_literal.cast_to(target_type).unwrap_or(utf8_literal)
429 }
430
431 fn build_binary_expr(
434 &self,
435 left: &LogExpr,
436 op: &BinaryOperator,
437 right: &LogExpr,
438 schema: &DFSchema,
439 ) -> Result<Expr> {
440 let mut left_expr = self.log_expr_to_df_expr(left, schema)?;
442 let mut right_expr = self.log_expr_to_df_expr(right, schema)?;
443
444 match (left, right) {
446 (LogExpr::Literal(_), LogExpr::Literal(_)) => {
447 }
449 (LogExpr::Literal(literal), _) => {
450 if let Ok(right_type) = right_expr.get_type(schema) {
452 let inferred_scalar = self.infer_literal_scalar_value(literal, &right_type);
453 left_expr = lit(inferred_scalar);
454 }
455 }
456 (_, LogExpr::Literal(literal)) => {
457 if let Ok(left_type) = left_expr.get_type(schema) {
459 let inferred_scalar = self.infer_literal_scalar_value(literal, &left_type);
460 right_expr = lit(inferred_scalar);
461 }
462 }
463 _ => {
464 }
466 }
467
468 let df_op = Self::binary_operator_to_df_operator(op);
469 Ok(Expr::BinaryExpr(BinaryExpr {
470 left: Box::new(left_expr),
471 op: df_op,
472 right: Box::new(right_expr),
473 }))
474 }
475
476 fn create_inferred_literal(&self, value: &str, expr: &Expr, schema: &DFSchema) -> Expr {
479 if let Ok(expr_type) = expr.get_type(schema) {
480 lit(self.infer_literal_scalar_value(value, &expr_type))
481 } else {
482 lit(ScalarValue::Utf8(Some(value.to_string())))
483 }
484 }
485
486 fn create_eq_literal(value: EqualValue) -> Expr {
487 match value {
488 EqualValue::String(s) => lit(ScalarValue::Utf8(Some(s))),
489 EqualValue::Float(n) => lit(ScalarValue::Float64(Some(n))),
490 EqualValue::Int(n) => lit(ScalarValue::Int64(Some(n))),
491 EqualValue::Boolean(b) => lit(ScalarValue::Boolean(Some(b))),
492 EqualValue::UInt(n) => lit(ScalarValue::UInt64(Some(n))),
493 }
494 }
495
496 fn process_log_expr(
500 &self,
501 plan_builder: LogicalPlanBuilder,
502 expr: &LogExpr,
503 ) -> Result<LogicalPlanBuilder> {
504 let mut plan_builder = plan_builder;
505
506 match expr {
507 LogExpr::AggrFunc { expr, by } => {
508 let schema = plan_builder.schema();
509 let (aggr_expr, group_exprs) = self.build_aggr_func(schema, expr, by)?;
510
511 plan_builder = plan_builder
512 .aggregate(group_exprs, aggr_expr)
513 .context(DataFusionPlanningSnafu)?;
514 }
515 LogExpr::Filter { filter } => {
516 let schema = plan_builder.schema();
517 if let Some(filter_expr) = self.build_column_filter(filter, schema.as_arrow())? {
518 plan_builder = plan_builder
519 .filter(filter_expr)
520 .context(DataFusionPlanningSnafu)?;
521 }
522 }
523 LogExpr::ScalarFunc { name, args, alias } => {
524 let schema = plan_builder.schema();
525 let expr = self.build_scalar_func(schema, name, args, alias)?;
526 plan_builder = plan_builder
527 .project([expr])
528 .context(DataFusionPlanningSnafu)?;
529 }
530 LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
531 }
533 LogExpr::Alias { expr, alias } => {
534 let schema = plan_builder.schema();
535 let df_expr = self.log_expr_to_df_expr(expr, schema)?;
536 let aliased_expr = df_expr.alias(alias);
537 plan_builder = plan_builder
538 .project([aliased_expr.clone()])
539 .context(DataFusionPlanningSnafu)?;
540 }
541 LogExpr::BinaryOp { .. } => {
542 let schema = plan_builder.schema();
543 let binary_expr = self.log_expr_to_df_expr(expr, schema)?;
544
545 plan_builder = plan_builder
546 .project([binary_expr])
547 .context(DataFusionPlanningSnafu)?;
548 }
549 _ => {
550 UnimplementedSnafu {
551 feature: "log expression",
552 }
553 .fail()?;
554 }
555 }
556 Ok(plan_builder)
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use std::sync::Arc;
563
564 use catalog::RegisterTableRequest;
565 use catalog::memory::MemoryCatalogManager;
566 use common_catalog::consts::DEFAULT_CATALOG_NAME;
567 use common_query::test_util::DummyDecoder;
568 use datafusion::execution::SessionStateBuilder;
569 use datatypes::prelude::ConcreteDataType;
570 use datatypes::schema::{ColumnSchema, SchemaRef};
571 use log_query::{
572 ColumnFilters, ConjunctionOperator, ContentFilter, Context, Filters, Limit, LogExpr,
573 };
574 use session::context::QueryContext;
575 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
576 use table::table_name::TableName;
577 use table::test_util::EmptyTable;
578
579 use super::*;
580
581 fn mock_schema() -> SchemaRef {
582 let columns = vec![
583 ColumnSchema::new(
584 "message".to_string(),
585 ConcreteDataType::string_datatype(),
586 false,
587 ),
588 ColumnSchema::new(
589 "timestamp".to_string(),
590 ConcreteDataType::timestamp_millisecond_datatype(),
591 false,
592 )
593 .with_time_index(true),
594 ColumnSchema::new(
595 "host".to_string(),
596 ConcreteDataType::string_datatype(),
597 true,
598 ),
599 ColumnSchema::new(
600 "is_active".to_string(),
601 ConcreteDataType::boolean_datatype(),
602 true,
603 ),
604 ];
605
606 Arc::new(Schema::new(columns))
607 }
608
609 fn mock_schema_with_typed_columns() -> SchemaRef {
610 let columns = vec![
611 ColumnSchema::new(
612 "message".to_string(),
613 ConcreteDataType::string_datatype(),
614 false,
615 ),
616 ColumnSchema::new(
617 "timestamp".to_string(),
618 ConcreteDataType::timestamp_millisecond_datatype(),
619 false,
620 )
621 .with_time_index(true),
622 ColumnSchema::new(
623 "host".to_string(),
624 ConcreteDataType::string_datatype(),
625 true,
626 ),
627 ColumnSchema::new(
628 "is_active".to_string(),
629 ConcreteDataType::boolean_datatype(),
630 true,
631 ),
632 ColumnSchema::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
634 ColumnSchema::new(
635 "score".to_string(),
636 ConcreteDataType::float64_datatype(),
637 true,
638 ),
639 ColumnSchema::new(
640 "count".to_string(),
641 ConcreteDataType::uint64_datatype(),
642 true,
643 ),
644 ];
645
646 Arc::new(Schema::new(columns))
647 }
648
649 async fn build_test_table_provider(
651 table_name_tuples: &[(String, String)],
652 ) -> DfTableSourceProvider {
653 build_test_table_provider_with_schema(table_name_tuples, mock_schema()).await
654 }
655
656 async fn build_test_table_provider_with_typed_columns(
658 table_name_tuples: &[(String, String)],
659 ) -> DfTableSourceProvider {
660 build_test_table_provider_with_schema(table_name_tuples, mock_schema_with_typed_columns())
661 .await
662 }
663
664 async fn build_test_table_provider_with_schema(
665 table_name_tuples: &[(String, String)],
666 schema: SchemaRef,
667 ) -> DfTableSourceProvider {
668 let catalog_list = MemoryCatalogManager::with_default_setup();
669 for (schema_name, table_name) in table_name_tuples {
670 let table_meta = TableMetaBuilder::empty()
671 .schema(schema.clone())
672 .primary_key_indices(vec![2])
673 .value_indices(vec![0])
674 .next_column_id(1024)
675 .build()
676 .unwrap();
677 let table_info = TableInfoBuilder::default()
678 .name(table_name.to_string())
679 .meta(table_meta)
680 .build()
681 .unwrap();
682 let table = EmptyTable::from_table_info(&table_info);
683
684 catalog_list
685 .register_table_sync(RegisterTableRequest {
686 catalog: DEFAULT_CATALOG_NAME.to_string(),
687 schema: schema_name.to_string(),
688 table_name: table_name.to_string(),
689 table_id: 1024,
690 table,
691 })
692 .unwrap();
693 }
694
695 DfTableSourceProvider::new(
696 catalog_list,
697 false,
698 QueryContext::arc(),
699 DummyDecoder::arc(),
700 false,
701 )
702 }
703
704 #[tokio::test]
705 async fn test_query_to_plan() {
706 let table_provider =
707 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
708 let session_state = SessionStateBuilder::new().with_default_features().build();
709 let mut planner = LogQueryPlanner::new(table_provider, session_state);
710
711 let log_query = LogQuery {
712 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
713 time_filter: TimeFilter {
714 start: Some("2021-01-01T00:00:00Z".to_string()),
715 end: Some("2021-01-02T00:00:00Z".to_string()),
716 span: None,
717 },
718 filters: Filters::Single(ColumnFilters {
719 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
720 filters: vec![ContentFilter::Contains("error".to_string())],
721 }),
722 limit: Limit {
723 skip: None,
724 fetch: Some(100),
725 },
726 context: Context::None,
727 columns: vec![],
728 exprs: vec![],
729 };
730
731 let plan = planner.query_to_plan(log_query).await.unwrap();
732 let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
733\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
734\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
735
736 assert_eq!(plan.display_indent_schema().to_string(), expected);
737 }
738
739 #[tokio::test]
740 async fn test_build_time_filter() {
741 let table_provider =
742 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
743 let session_state = SessionStateBuilder::new().with_default_features().build();
744 let planner = LogQueryPlanner::new(table_provider, session_state);
745
746 let time_filter = TimeFilter {
747 start: Some("2021-01-01T00:00:00Z".to_string()),
748 end: Some("2021-01-02T00:00:00Z".to_string()),
749 span: None,
750 };
751
752 let expr = planner
753 .build_time_filter(&time_filter, &mock_schema())
754 .unwrap();
755
756 let expected_expr = col("timestamp")
757 .gt_eq(lit(ScalarValue::Utf8(Some(
758 "2021-01-01T00:00:00Z".to_string(),
759 ))))
760 .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
761 "2021-01-02T00:00:00Z".to_string(),
762 )))));
763
764 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
765 }
766
767 #[tokio::test]
768 async fn test_build_time_filter_without_end() {
769 let table_provider =
770 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
771 let session_state = SessionStateBuilder::new().with_default_features().build();
772 let planner = LogQueryPlanner::new(table_provider, session_state);
773
774 let time_filter = TimeFilter {
775 start: Some("2021-01-01T00:00:00Z".to_string()),
776 end: None,
777 span: None,
778 };
779
780 let expr = planner
781 .build_time_filter(&time_filter, &mock_schema())
782 .unwrap();
783
784 let expected_expr = col("timestamp")
785 .gt_eq(lit(ScalarValue::Utf8(Some(
786 "2021-01-01T00:00:00Z".to_string(),
787 ))))
788 .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
789 "9999-12-31T23:59:59Z".to_string(),
790 )))));
791
792 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
793 }
794
795 #[tokio::test]
796 async fn test_build_content_filter() {
797 let table_provider =
798 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
799 let session_state = SessionStateBuilder::new().with_default_features().build();
800 let planner = LogQueryPlanner::new(table_provider, session_state);
801 let schema = mock_schema();
802
803 let column_filter = ColumnFilters {
804 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
805 filters: vec![
806 ContentFilter::Contains("error".to_string()),
807 ContentFilter::Prefix("WARN".to_string()),
808 ],
809 };
810
811 let expr_option = planner
812 .build_column_filter(&column_filter, schema.arrow_schema())
813 .unwrap();
814 assert!(expr_option.is_some());
815
816 let expr = expr_option.unwrap();
817
818 let expected_expr = col("message")
819 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
820 .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
821
822 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
823 }
824
825 #[tokio::test]
826 async fn test_query_to_plan_with_only_skip() {
827 let table_provider =
828 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
829 let session_state = SessionStateBuilder::new().with_default_features().build();
830 let mut planner = LogQueryPlanner::new(table_provider, session_state);
831
832 let log_query = LogQuery {
833 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
834 time_filter: TimeFilter {
835 start: Some("2021-01-01T00:00:00Z".to_string()),
836 end: Some("2021-01-02T00:00:00Z".to_string()),
837 span: None,
838 },
839 filters: Filters::Single(ColumnFilters {
840 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
841 filters: vec![ContentFilter::Contains("error".to_string())],
842 }),
843 limit: Limit {
844 skip: Some(10),
845 fetch: None,
846 },
847 context: Context::None,
848 columns: vec![],
849 exprs: vec![],
850 };
851
852 let plan = planner.query_to_plan(log_query).await.unwrap();
853 let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
854\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
855\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
856
857 assert_eq!(plan.display_indent_schema().to_string(), expected);
858 }
859
860 #[tokio::test]
861 async fn test_query_to_plan_without_limit() {
862 let table_provider =
863 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
864 let session_state = SessionStateBuilder::new().with_default_features().build();
865 let mut planner = LogQueryPlanner::new(table_provider, session_state);
866
867 let log_query = LogQuery {
868 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
869 time_filter: TimeFilter {
870 start: Some("2021-01-01T00:00:00Z".to_string()),
871 end: Some("2021-01-02T00:00:00Z".to_string()),
872 span: None,
873 },
874 filters: Filters::Single(ColumnFilters {
875 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
876 filters: vec![ContentFilter::Contains("error".to_string())],
877 }),
878 limit: Limit {
879 skip: None,
880 fetch: None,
881 },
882 context: Context::None,
883 columns: vec![],
884 exprs: vec![],
885 };
886
887 let plan = planner.query_to_plan(log_query).await.unwrap();
888 let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
889\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
890\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
891
892 assert_eq!(plan.display_indent_schema().to_string(), expected);
893 }
894
895 #[test]
896 fn test_escape_pattern() {
897 assert_eq!(escape_like_pattern("test"), "test");
898 assert_eq!(escape_like_pattern("te%st"), "te\\%st");
899 assert_eq!(escape_like_pattern("te_st"), "te\\_st");
900 assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
901 }
902
903 #[tokio::test]
904 async fn test_query_to_plan_with_aggr_func() {
905 let table_provider =
906 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
907 let session_state = SessionStateBuilder::new().with_default_features().build();
908 let mut planner = LogQueryPlanner::new(table_provider, session_state);
909
910 let log_query = LogQuery {
911 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
912 time_filter: TimeFilter {
913 start: Some("2021-01-01T00:00:00Z".to_string()),
914 end: Some("2021-01-02T00:00:00Z".to_string()),
915 span: None,
916 },
917 filters: Default::default(),
918 limit: Limit {
919 skip: None,
920 fetch: Some(100),
921 },
922 context: Context::None,
923 columns: vec![],
924 exprs: vec![LogExpr::AggrFunc {
925 expr: vec![AggFunc::new(
926 "count".to_string(),
927 vec![LogExpr::NamedIdent("message".to_string())],
928 Some("count_result".to_string()),
929 )],
930 by: vec![LogExpr::NamedIdent("host".to_string())],
931 }],
932 };
933
934 let plan = planner.query_to_plan(log_query).await.unwrap();
935 let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
936\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
937\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
938\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
939
940 assert_eq!(plan.display_indent_schema().to_string(), expected);
941 }
942
943 #[tokio::test]
944 async fn test_query_to_plan_with_scalar_func() {
945 let table_provider =
946 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
947 let session_state = SessionStateBuilder::new().with_default_features().build();
948 let mut planner = LogQueryPlanner::new(table_provider, session_state);
949
950 let log_query = LogQuery {
951 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
952 time_filter: TimeFilter {
953 start: Some("2021-01-01T00:00:00Z".to_string()),
954 end: Some("2021-01-02T00:00:00Z".to_string()),
955 span: None,
956 },
957 filters: Default::default(),
958 limit: Limit {
959 skip: None,
960 fetch: Some(100),
961 },
962 context: Context::None,
963 columns: vec![],
964 exprs: vec![LogExpr::ScalarFunc {
965 name: "date_trunc".to_string(),
966 args: vec![
967 LogExpr::NamedIdent("timestamp".to_string()),
968 LogExpr::Literal("day".to_string()),
969 ],
970 alias: Some("time_bucket".to_string()),
971 }],
972 };
973
974 let plan = planner.query_to_plan(log_query).await.unwrap();
975 let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
976 \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
977 \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
978 \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
979
980 assert_eq!(plan.display_indent_schema().to_string(), expected);
981 }
982
983 #[tokio::test]
984 async fn test_build_content_filter_between() {
985 let table_provider =
986 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
987 let session_state = SessionStateBuilder::new().with_default_features().build();
988 let planner = LogQueryPlanner::new(table_provider, session_state);
989 let schema = mock_schema();
990
991 let column_filter = ColumnFilters {
992 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
993 filters: vec![ContentFilter::Between {
994 start: "a".to_string(),
995 end: "z".to_string(),
996 start_inclusive: true,
997 end_inclusive: false,
998 }],
999 };
1000
1001 let expr_option = planner
1002 .build_column_filter(&column_filter, schema.arrow_schema())
1003 .unwrap();
1004 assert!(expr_option.is_some());
1005
1006 let expr = expr_option.unwrap();
1007 let expected_expr = col("message")
1008 .gt_eq(lit(ScalarValue::Utf8(Some("a".to_string()))))
1009 .and(col("message").lt(lit(ScalarValue::Utf8(Some("z".to_string())))));
1010
1011 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1012 }
1013
1014 #[tokio::test]
1015 async fn test_query_to_plan_with_date_histogram() {
1016 let table_provider =
1017 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1018 let session_state = SessionStateBuilder::new().with_default_features().build();
1019 let mut planner = LogQueryPlanner::new(table_provider, session_state);
1020
1021 let log_query = LogQuery {
1022 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
1023 time_filter: TimeFilter {
1024 start: Some("2021-01-01T00:00:00Z".to_string()),
1025 end: Some("2021-01-02T00:00:00Z".to_string()),
1026 span: None,
1027 },
1028 filters: Default::default(),
1029 limit: Limit {
1030 skip: Some(0),
1031 fetch: None,
1032 },
1033 context: Context::None,
1034 columns: vec![],
1035 exprs: vec![
1036 LogExpr::ScalarFunc {
1037 name: "date_bin".to_string(),
1038 args: vec![
1039 LogExpr::Literal("30 seconds".to_string()),
1040 LogExpr::NamedIdent("timestamp".to_string()),
1041 ],
1042 alias: Some("2__date_histogram__time_bucket".to_string()),
1043 },
1044 LogExpr::AggrFunc {
1045 expr: vec![AggFunc::new(
1046 "count".to_string(),
1047 vec![LogExpr::PositionalIdent(0)],
1048 Some("count_result".to_string()),
1049 )],
1050 by: vec![LogExpr::NamedIdent(
1051 "2__date_histogram__time_bucket".to_string(),
1052 )],
1053 },
1054 ],
1055 };
1056
1057 let plan = planner.query_to_plan(log_query).await.unwrap();
1058 let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
1059\n Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
1060\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1061\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1062\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
1063
1064 assert_eq!(plan.display_indent_schema().to_string(), expected);
1065 }
1066
1067 #[tokio::test]
1068 async fn test_build_compound_filter() {
1069 let table_provider =
1070 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1071 let session_state = SessionStateBuilder::new().with_default_features().build();
1072 let planner = LogQueryPlanner::new(table_provider, session_state);
1073 let schema = mock_schema();
1074
1075 let column_filter = ColumnFilters {
1077 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1078 filters: vec![
1079 ContentFilter::Contains("error".to_string()),
1080 ContentFilter::Prefix("WARN".to_string()),
1081 ],
1082 };
1083 let expr = planner
1084 .build_column_filter(&column_filter, schema.arrow_schema())
1085 .unwrap()
1086 .unwrap();
1087
1088 let expected_expr = col("message")
1089 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1090 .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1091
1092 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1093
1094 let column_filter = ColumnFilters {
1096 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1097 filters: vec![ContentFilter::Compound(
1098 vec![
1099 ContentFilter::Contains("error".to_string()),
1100 ContentFilter::Prefix("WARN".to_string()),
1101 ],
1102 ConjunctionOperator::Or,
1103 )],
1104 };
1105 let expr = planner
1106 .build_column_filter(&column_filter, schema.arrow_schema())
1107 .unwrap()
1108 .unwrap();
1109
1110 let expected_expr = col("message")
1111 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1112 .or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1113
1114 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1115
1116 let column_filter = ColumnFilters {
1118 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1119 filters: vec![ContentFilter::Compound(
1120 vec![
1121 ContentFilter::Contains("error".to_string()),
1122 ContentFilter::Compound(
1123 vec![
1124 ContentFilter::Prefix("WARN".to_string()),
1125 ContentFilter::Exact("DEBUG".to_string()),
1126 ],
1127 ConjunctionOperator::Or,
1128 ),
1129 ],
1130 ConjunctionOperator::And,
1131 )],
1132 };
1133 let expr = planner
1134 .build_column_filter(&column_filter, schema.arrow_schema())
1135 .unwrap()
1136 .unwrap();
1137
1138 let expected_nested = col("message")
1139 .like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
1140 .or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string())))));
1141 let expected_expr = col("message")
1142 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1143 .and(expected_nested);
1144
1145 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1146 }
1147
1148 #[tokio::test]
1149 async fn test_build_great_than_filter() {
1150 let table_provider =
1151 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1152 let session_state = SessionStateBuilder::new().with_default_features().build();
1153 let planner = LogQueryPlanner::new(table_provider, session_state);
1154 let schema = mock_schema();
1155
1156 let column_filter = ColumnFilters {
1158 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1159 filters: vec![ContentFilter::GreatThan {
1160 value: "error".to_string(),
1161 inclusive: true,
1162 }],
1163 };
1164
1165 let expr_option = planner
1166 .build_column_filter(&column_filter, schema.arrow_schema())
1167 .unwrap();
1168 assert!(expr_option.is_some());
1169
1170 let expr = expr_option.unwrap();
1171 let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1172
1173 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1174
1175 let column_filter = ColumnFilters {
1177 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1178 filters: vec![ContentFilter::GreatThan {
1179 value: "error".to_string(),
1180 inclusive: false,
1181 }],
1182 };
1183
1184 let expr_option = planner
1185 .build_column_filter(&column_filter, schema.arrow_schema())
1186 .unwrap();
1187 assert!(expr_option.is_some());
1188
1189 let expr = expr_option.unwrap();
1190 let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1191
1192 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1193 }
1194
1195 #[tokio::test]
1196 async fn test_build_less_than_filter() {
1197 let table_provider =
1198 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1199 let session_state = SessionStateBuilder::new().with_default_features().build();
1200 let planner = LogQueryPlanner::new(table_provider, session_state);
1201 let schema = mock_schema();
1202
1203 let column_filter = ColumnFilters {
1205 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1206 filters: vec![ContentFilter::LessThan {
1207 value: "error".to_string(),
1208 inclusive: true,
1209 }],
1210 };
1211
1212 let expr_option = planner
1213 .build_column_filter(&column_filter, schema.arrow_schema())
1214 .unwrap();
1215 assert!(expr_option.is_some());
1216
1217 let expr = expr_option.unwrap();
1218 let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1219
1220 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1221
1222 let column_filter = ColumnFilters {
1224 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1225 filters: vec![ContentFilter::LessThan {
1226 value: "error".to_string(),
1227 inclusive: false,
1228 }],
1229 };
1230
1231 let expr_option = planner
1232 .build_column_filter(&column_filter, schema.arrow_schema())
1233 .unwrap();
1234 assert!(expr_option.is_some());
1235
1236 let expr = expr_option.unwrap();
1237 let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1238
1239 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1240 }
1241
1242 #[tokio::test]
1243 async fn test_build_in_filter() {
1244 let table_provider =
1245 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1246 let session_state = SessionStateBuilder::new().with_default_features().build();
1247 let planner = LogQueryPlanner::new(table_provider, session_state);
1248 let schema = mock_schema();
1249
1250 let column_filter = ColumnFilters {
1252 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1253 filters: vec![ContentFilter::In(vec![
1254 "error".to_string(),
1255 "warning".to_string(),
1256 "info".to_string(),
1257 ])],
1258 };
1259
1260 let expr_option = planner
1261 .build_column_filter(&column_filter, schema.arrow_schema())
1262 .unwrap();
1263 assert!(expr_option.is_some());
1264
1265 let expr = expr_option.unwrap();
1266 let expected_expr = col("message").in_list(
1267 vec![
1268 lit(ScalarValue::Utf8(Some("error".to_string()))),
1269 lit(ScalarValue::Utf8(Some("warning".to_string()))),
1270 lit(ScalarValue::Utf8(Some("info".to_string()))),
1271 ],
1272 false,
1273 );
1274
1275 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1276 }
1277
1278 #[tokio::test]
1279 async fn test_build_is_true_filter() {
1280 let table_provider =
1281 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1282 let session_state = SessionStateBuilder::new().with_default_features().build();
1283 let planner = LogQueryPlanner::new(table_provider, session_state);
1284 let schema = mock_schema();
1285
1286 let column_filter = ColumnFilters {
1288 expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1289 filters: vec![ContentFilter::IsTrue],
1290 };
1291
1292 let expr_option = planner
1293 .build_column_filter(&column_filter, schema.arrow_schema())
1294 .unwrap();
1295 assert!(expr_option.is_some());
1296
1297 let expr = expr_option.unwrap();
1298 let expected_expr_string =
1299 "IsTrue(Column(Column { relation: None, name: \"is_active\" }))".to_string();
1300
1301 assert_eq!(format!("{:?}", expr), expected_expr_string);
1302 }
1303
1304 #[tokio::test]
1305 async fn test_build_filter_with_scalar_fn() {
1306 let table_provider =
1307 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1308 let session_state = SessionStateBuilder::new().with_default_features().build();
1309 let planner = LogQueryPlanner::new(table_provider, session_state);
1310 let schema = mock_schema();
1311
1312 let column_filter = ColumnFilters {
1313 expr: Box::new(LogExpr::BinaryOp {
1314 left: Box::new(LogExpr::ScalarFunc {
1315 name: "character_length".to_string(),
1316 args: vec![LogExpr::NamedIdent("message".to_string())],
1317 alias: None,
1318 }),
1319 op: BinaryOperator::Gt,
1320 right: Box::new(LogExpr::Literal("100".to_string())),
1321 }),
1322 filters: vec![ContentFilter::IsTrue],
1323 };
1324
1325 let expr_option = planner
1326 .build_column_filter(&column_filter, schema.arrow_schema())
1327 .unwrap();
1328 assert!(expr_option.is_some());
1329
1330 let expr = expr_option.unwrap();
1331 let expected_expr_string = "character_length(message) > Int32(100) IS TRUE";
1332
1333 assert_eq!(format!("{}", expr), expected_expr_string);
1334 }
1335
1336 #[tokio::test]
1337 async fn test_type_inference_float_comparison() {
1338 let table_provider = build_test_table_provider_with_typed_columns(&[(
1339 "public".to_string(),
1340 "test_table".to_string(),
1341 )])
1342 .await;
1343 let session_state = SessionStateBuilder::new().with_default_features().build();
1344 let planner = LogQueryPlanner::new(table_provider, session_state);
1345 let schema = mock_schema_with_typed_columns();
1346
1347 let column_filter = ColumnFilters {
1349 expr: Box::new(LogExpr::NamedIdent("score".to_string())),
1350 filters: vec![ContentFilter::Between {
1351 start: "75.5".to_string(),
1352 end: "100.0".to_string(),
1353 start_inclusive: true,
1354 end_inclusive: false,
1355 }],
1356 };
1357
1358 let expr_option = planner
1359 .build_column_filter(&column_filter, schema.arrow_schema())
1360 .unwrap();
1361 assert!(expr_option.is_some());
1362
1363 let expr = expr_option.unwrap();
1364 let expected_expr = col("score")
1366 .gt_eq(lit(ScalarValue::Float64(Some(75.5))))
1367 .and(col("score").lt(lit(ScalarValue::Float64(Some(100.0)))));
1368
1369 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1370 }
1371
1372 #[tokio::test]
1373 async fn test_type_inference_boolean_comparison() {
1374 let table_provider = build_test_table_provider_with_typed_columns(&[(
1375 "public".to_string(),
1376 "test_table".to_string(),
1377 )])
1378 .await;
1379 let session_state = SessionStateBuilder::new().with_default_features().build();
1380 let planner = LogQueryPlanner::new(table_provider, session_state);
1381 let schema = mock_schema_with_typed_columns();
1382
1383 let column_filter = ColumnFilters {
1385 expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1386 filters: vec![ContentFilter::In(vec![
1387 "true".to_string(),
1388 "1".to_string(),
1389 "false".to_string(),
1390 ])],
1391 };
1392
1393 let expr_option = planner
1394 .build_column_filter(&column_filter, schema.arrow_schema())
1395 .unwrap();
1396 assert!(expr_option.is_some());
1397
1398 let expr = expr_option.unwrap();
1399 let expected_expr = col("is_active").in_list(
1401 vec![
1402 lit(ScalarValue::Boolean(Some(true))),
1403 lit(ScalarValue::Boolean(Some(true))),
1404 lit(ScalarValue::Boolean(Some(false))),
1405 ],
1406 false,
1407 );
1408
1409 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1410 }
1411
1412 #[tokio::test]
1413 async fn test_fallback_to_utf8_on_parse_failure() {
1414 let table_provider = build_test_table_provider_with_typed_columns(&[(
1415 "public".to_string(),
1416 "test_table".to_string(),
1417 )])
1418 .await;
1419 let session_state = SessionStateBuilder::new().with_default_features().build();
1420 let planner = LogQueryPlanner::new(table_provider, session_state);
1421 let schema = mock_schema_with_typed_columns();
1422
1423 let column_filter = ColumnFilters {
1425 expr: Box::new(LogExpr::NamedIdent("age".to_string())),
1426 filters: vec![ContentFilter::GreatThan {
1427 value: "not_a_number".to_string(),
1428 inclusive: false,
1429 }],
1430 };
1431
1432 let expr_option = planner
1433 .build_column_filter(&column_filter, schema.arrow_schema())
1434 .unwrap();
1435 assert!(expr_option.is_some());
1436
1437 let expr = expr_option.unwrap();
1438 let expected_expr = col("age").gt(lit(ScalarValue::Utf8(Some("not_a_number".to_string()))));
1440
1441 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1442 }
1443
1444 #[tokio::test]
1445 async fn test_string_column_remains_utf8() {
1446 let table_provider = build_test_table_provider_with_typed_columns(&[(
1447 "public".to_string(),
1448 "test_table".to_string(),
1449 )])
1450 .await;
1451 let session_state = SessionStateBuilder::new().with_default_features().build();
1452 let planner = LogQueryPlanner::new(table_provider, session_state);
1453 let schema = mock_schema_with_typed_columns();
1454
1455 let column_filter = ColumnFilters {
1457 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1458 filters: vec![ContentFilter::GreatThan {
1459 value: "123".to_string(),
1460 inclusive: false,
1461 }],
1462 };
1463
1464 let expr_option = planner
1465 .build_column_filter(&column_filter, schema.arrow_schema())
1466 .unwrap();
1467 assert!(expr_option.is_some());
1468
1469 let expr = expr_option.unwrap();
1470 let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("123".to_string()))));
1472
1473 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1474 }
1475
1476 #[tokio::test]
1477 async fn test_all_binary_operators() {
1478 let table_provider = build_test_table_provider_with_typed_columns(&[(
1479 "public".to_string(),
1480 "test_table".to_string(),
1481 )])
1482 .await;
1483 let session_state = SessionStateBuilder::new().with_default_features().build();
1484 let planner = LogQueryPlanner::new(table_provider, session_state);
1485 let schema = mock_schema_with_typed_columns();
1486
1487 let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1488
1489 let test_cases = vec![
1491 (BinaryOperator::Eq, Operator::Eq),
1492 (BinaryOperator::Ne, Operator::NotEq),
1493 (BinaryOperator::Lt, Operator::Lt),
1494 (BinaryOperator::Le, Operator::LtEq),
1495 (BinaryOperator::Gt, Operator::Gt),
1496 (BinaryOperator::Ge, Operator::GtEq),
1497 (BinaryOperator::Plus, Operator::Plus),
1498 (BinaryOperator::Minus, Operator::Minus),
1499 (BinaryOperator::Multiply, Operator::Multiply),
1500 (BinaryOperator::Divide, Operator::Divide),
1501 (BinaryOperator::Modulo, Operator::Modulo),
1502 (BinaryOperator::And, Operator::And),
1503 (BinaryOperator::Or, Operator::Or),
1504 ];
1505
1506 for (binary_op, expected_df_op) in test_cases {
1507 let binary_expr = LogExpr::BinaryOp {
1508 left: Box::new(LogExpr::NamedIdent("age".to_string())),
1509 op: binary_op,
1510 right: Box::new(LogExpr::Literal("25".to_string())),
1511 };
1512
1513 let expr = planner
1514 .log_expr_to_df_expr(&binary_expr, &df_schema)
1515 .unwrap();
1516
1517 let expected_expr = Expr::BinaryExpr(BinaryExpr {
1518 left: Box::new(col("age")),
1519 op: expected_df_op,
1520 right: Box::new(lit(ScalarValue::Int32(Some(25)))),
1521 });
1522
1523 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1524 }
1525 }
1526
1527 #[tokio::test]
1528 async fn test_nested_binary_operations() {
1529 let table_provider = build_test_table_provider_with_typed_columns(&[(
1530 "public".to_string(),
1531 "test_table".to_string(),
1532 )])
1533 .await;
1534 let session_state = SessionStateBuilder::new().with_default_features().build();
1535 let planner = LogQueryPlanner::new(table_provider, session_state);
1536 let schema = mock_schema_with_typed_columns();
1537
1538 let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1539
1540 let nested_binary_expr = LogExpr::BinaryOp {
1542 left: Box::new(LogExpr::BinaryOp {
1543 left: Box::new(LogExpr::NamedIdent("age".to_string())),
1544 op: BinaryOperator::Plus,
1545 right: Box::new(LogExpr::Literal("5".to_string())),
1546 }),
1547 op: BinaryOperator::Gt,
1548 right: Box::new(LogExpr::Literal("30".to_string())),
1549 };
1550
1551 let expr = planner
1552 .log_expr_to_df_expr(&nested_binary_expr, &df_schema)
1553 .unwrap();
1554
1555 let expected_expr_debug = r#"BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "age" }), op: Plus, right: Literal(Int32(5), None) }), op: Gt, right: Literal(Int32(30), None) })"#;
1557 assert_eq!(format!("{:?}", expr), expected_expr_debug);
1558 }
1559}