1use arrow_schema::{DataType, Schema as ArrowSchema};
16use catalog::table_source::DfTableSourceProvider;
17use common_function::utils::escape_like_pattern;
18use datafusion::datasource::DefaultTableSource;
19use datafusion::execution::SessionState;
20use datafusion_common::{DFSchema, ScalarValue};
21use datafusion_expr::utils::{conjunction, disjunction};
22use datafusion_expr::{
23 col, lit, not, BinaryExpr, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, Operator,
24};
25use datafusion_sql::TableReference;
26use datatypes::schema::Schema;
27use log_query::{BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
28use snafu::{OptionExt, ResultExt};
29use table::table::adapter::DfTableProviderAdapter;
30
31use crate::log_query::error::{
32 CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
33 UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
34 UnknownTableSnafu,
35};
36
37const DEFAULT_LIMIT: usize = 1000;
38
39pub struct LogQueryPlanner {
40 table_provider: DfTableSourceProvider,
41 session_state: SessionState,
42}
43
44impl LogQueryPlanner {
45 pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
46 Self {
47 table_provider,
48 session_state,
49 }
50 }
51
52 pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
53 let table_ref: TableReference = query.table.table_ref().into();
55 let table_source = self
56 .table_provider
57 .resolve_table(table_ref.clone())
58 .await
59 .context(CatalogSnafu)?;
60 let schema = table_source
61 .as_any()
62 .downcast_ref::<DefaultTableSource>()
63 .context(UnknownTableSnafu)?
64 .table_provider
65 .as_any()
66 .downcast_ref::<DfTableProviderAdapter>()
67 .context(UnknownTableSnafu)?
68 .table()
69 .schema();
70
71 let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
73 .context(DataFusionPlanningSnafu)?;
74 let df_schema = plan_builder.schema().clone();
75
76 let mut filters = Vec::new();
78
79 filters.push(self.build_time_filter(&query.time_filter, &schema)?);
81
82 if let Some(filters_expr) = self.build_filters(&query.filters, df_schema.as_arrow())? {
83 filters.push(filters_expr);
84 }
85
86 if !filters.is_empty() {
88 let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap();
89 plan_builder = plan_builder
90 .filter(filter_expr)
91 .context(DataFusionPlanningSnafu)?;
92 }
93
94 if !query.columns.is_empty() {
96 let projected_columns = query.columns.iter().map(col).collect::<Vec<_>>();
97 plan_builder = plan_builder
98 .project(projected_columns)
99 .context(DataFusionPlanningSnafu)?;
100 }
101
102 plan_builder = plan_builder
104 .limit(
105 query.limit.skip.unwrap_or(0),
106 Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
107 )
108 .context(DataFusionPlanningSnafu)?;
109
110 for expr in &query.exprs {
112 plan_builder = self.process_log_expr(plan_builder, expr)?;
113 }
114
115 let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
117
118 Ok(plan)
119 }
120
121 fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result<Expr> {
122 let timestamp_col = schema
123 .timestamp_column()
124 .with_context(|| TimeIndexNotFoundSnafu {})?
125 .name
126 .clone();
127
128 let start_time = ScalarValue::Utf8(time_filter.start.clone());
129 let end_time = ScalarValue::Utf8(
130 time_filter
131 .end
132 .clone()
133 .or(Some("9999-12-31T23:59:59Z".to_string())),
134 );
135 let expr = col(timestamp_col.clone())
136 .gt_eq(lit(start_time))
137 .and(col(timestamp_col).lt_eq(lit(end_time)));
138
139 Ok(expr)
140 }
141 fn build_filters(
143 &self,
144 filters: &log_query::Filters,
145 schema: &ArrowSchema,
146 ) -> Result<Option<Expr>> {
147 match filters {
148 log_query::Filters::And(filters) => {
149 let exprs = filters
150 .iter()
151 .filter_map(|filter| self.build_filters(filter, schema).transpose())
152 .try_collect::<Vec<_>>()?;
153 if exprs.is_empty() {
154 Ok(None)
155 } else {
156 Ok(conjunction(exprs))
157 }
158 }
159 log_query::Filters::Or(filters) => {
160 let exprs = filters
161 .iter()
162 .filter_map(|filter| self.build_filters(filter, schema).transpose())
163 .try_collect::<Vec<_>>()?;
164 if exprs.is_empty() {
165 Ok(None)
166 } else {
167 Ok(disjunction(exprs))
168 }
169 }
170 log_query::Filters::Not(filter) => {
171 if let Some(expr) = self.build_filters(filter, schema)? {
172 Ok(Some(not(expr)))
173 } else {
174 Ok(None)
175 }
176 }
177 log_query::Filters::Single(column_filters) => {
178 self.build_column_filter(column_filters, schema)
180 }
181 }
182 }
183
184 fn build_column_filter(
186 &self,
187 column_filter: &log_query::ColumnFilters,
188 schema: &ArrowSchema,
189 ) -> Result<Option<Expr>> {
190 let df_schema = DFSchema::try_from(schema.clone()).context(DataFusionPlanningSnafu)?;
192 let col_expr = self.log_expr_to_df_expr(&column_filter.expr, &df_schema)?;
193
194 let filter_exprs = column_filter
195 .filters
196 .iter()
197 .filter_map(|filter| {
198 self.build_content_filter_with_expr(col_expr.clone(), filter, &df_schema)
199 .transpose()
200 })
201 .try_collect::<Vec<_>>()?;
202
203 if filter_exprs.is_empty() {
204 return Ok(Some(col_expr.is_true()));
205 }
206
207 Ok(conjunction(filter_exprs))
209 }
210
211 #[allow(clippy::only_used_in_recursion)]
213 fn build_content_filter_with_expr(
214 &self,
215 col_expr: Expr,
216 filter: &log_query::ContentFilter,
217 schema: &DFSchema,
218 ) -> Result<Option<Expr>> {
219 match filter {
220 log_query::ContentFilter::Exact(value) => Ok(Some(
221 col_expr.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))),
222 )),
223 log_query::ContentFilter::Prefix(value) => Ok(Some(col_expr.like(lit(
224 ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(value)))),
225 )))),
226 log_query::ContentFilter::Postfix(value) => Ok(Some(col_expr.like(lit(
227 ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(value)))),
228 )))),
229 log_query::ContentFilter::Contains(value) => Ok(Some(col_expr.like(lit(
230 ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(value)))),
231 )))),
232 log_query::ContentFilter::Regex(_pattern) => Err(UnimplementedSnafu {
233 feature: "regex filter",
234 }
235 .build()),
236 log_query::ContentFilter::Exist => Ok(Some(col_expr.is_not_null())),
237 log_query::ContentFilter::Between {
238 start,
239 end,
240 start_inclusive,
241 end_inclusive,
242 } => {
243 let start_literal = self.create_inferred_literal(start, &col_expr, schema);
244 let end_literal = self.create_inferred_literal(end, &col_expr, schema);
245
246 let left = if *start_inclusive {
247 col_expr.clone().gt_eq(start_literal)
248 } else {
249 col_expr.clone().gt(start_literal)
250 };
251 let right = if *end_inclusive {
252 col_expr.lt_eq(end_literal)
253 } else {
254 col_expr.lt(end_literal)
255 };
256 Ok(Some(left.and(right)))
257 }
258 log_query::ContentFilter::GreatThan { value, inclusive } => {
259 let value_literal = self.create_inferred_literal(value, &col_expr, schema);
260 let comparison_expr = if *inclusive {
261 col_expr.gt_eq(value_literal)
262 } else {
263 col_expr.gt(value_literal)
264 };
265 Ok(Some(comparison_expr))
266 }
267 log_query::ContentFilter::LessThan { value, inclusive } => {
268 let value_literal = self.create_inferred_literal(value, &col_expr, schema);
269 if *inclusive {
270 Ok(Some(col_expr.lt_eq(value_literal)))
271 } else {
272 Ok(Some(col_expr.lt(value_literal)))
273 }
274 }
275 log_query::ContentFilter::In(values) => {
276 let inferred_values: Vec<_> = values
277 .iter()
278 .map(|v| self.create_inferred_literal(v, &col_expr, schema))
279 .collect();
280 Ok(Some(col_expr.in_list(inferred_values, false)))
281 }
282 log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())),
283 log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())),
284 log_query::ContentFilter::Equal(value) => {
285 let value_literal = Self::create_eq_literal(value.clone());
286 Ok(Some(col_expr.eq(value_literal)))
287 }
288 log_query::ContentFilter::Compound(filters, op) => {
289 let exprs = filters
290 .iter()
291 .filter_map(|filter| {
292 self.build_content_filter_with_expr(col_expr.clone(), filter, schema)
293 .transpose()
294 })
295 .try_collect::<Vec<_>>()?;
296
297 if exprs.is_empty() {
298 return Ok(None);
299 }
300
301 match op {
302 log_query::ConjunctionOperator::And => Ok(conjunction(exprs)),
303 log_query::ConjunctionOperator::Or => {
304 Ok(exprs.into_iter().reduce(|a, b| a.or(b)))
306 }
307 }
308 }
309 }
310 }
311
312 fn build_aggr_func(
313 &self,
314 schema: &DFSchema,
315 fn_name: &str,
316 args: &[LogExpr],
317 by: &[LogExpr],
318 ) -> Result<(Expr, Vec<Expr>)> {
319 let aggr_fn = self
320 .session_state
321 .aggregate_functions()
322 .get(fn_name)
323 .context(UnknownAggregateFunctionSnafu {
324 name: fn_name.to_string(),
325 })?;
326 let args = args
327 .iter()
328 .map(|expr| self.log_expr_to_df_expr(expr, schema))
329 .try_collect::<Vec<_>>()?;
330 let group_exprs = by
331 .iter()
332 .map(|expr| self.log_expr_to_df_expr(expr, schema))
333 .try_collect::<Vec<_>>()?;
334 let aggr_expr = aggr_fn.call(args);
335
336 Ok((aggr_expr, group_exprs))
337 }
338
339 fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
341 match expr {
342 LogExpr::NamedIdent(name) => Ok(col(name)),
343 LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
344 LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
345 LogExpr::BinaryOp { left, op, right } => {
346 self.build_binary_expr(left, op, right, schema)
348 }
349 LogExpr::ScalarFunc { name, args, alias } => {
350 self.build_scalar_func(schema, name, args, alias)
351 }
352 LogExpr::Alias { expr, alias } => {
353 let df_expr = self.log_expr_to_df_expr(expr, schema)?;
354 Ok(df_expr.alias(alias))
355 }
356 LogExpr::AggrFunc { .. } | LogExpr::Filter { .. } | LogExpr::Decompose { .. } => {
357 UnexpectedLogExprSnafu {
358 expr: expr.clone(),
359 expected: "not a typical expression",
360 }
361 .fail()
362 }
363 }
364 }
365
366 fn build_scalar_func(
367 &self,
368 schema: &DFSchema,
369 name: &str,
370 args: &[LogExpr],
371 alias: &Option<String>,
372 ) -> Result<Expr> {
373 let args = args
374 .iter()
375 .map(|expr| self.log_expr_to_df_expr(expr, schema))
376 .try_collect::<Vec<_>>()?;
377 let func = self.session_state.scalar_functions().get(name).context(
378 UnknownScalarFunctionSnafu {
379 name: name.to_string(),
380 },
381 )?;
382 let expr = func.call(args);
383
384 if let Some(alias) = alias {
385 Ok(expr.alias(alias))
386 } else {
387 Ok(expr)
388 }
389 }
390
391 fn binary_operator_to_df_operator(op: &BinaryOperator) -> Operator {
393 match op {
394 BinaryOperator::Eq => Operator::Eq,
395 BinaryOperator::Ne => Operator::NotEq,
396 BinaryOperator::Lt => Operator::Lt,
397 BinaryOperator::Le => Operator::LtEq,
398 BinaryOperator::Gt => Operator::Gt,
399 BinaryOperator::Ge => Operator::GtEq,
400 BinaryOperator::Plus => Operator::Plus,
401 BinaryOperator::Minus => Operator::Minus,
402 BinaryOperator::Multiply => Operator::Multiply,
403 BinaryOperator::Divide => Operator::Divide,
404 BinaryOperator::Modulo => Operator::Modulo,
405 BinaryOperator::And => Operator::And,
406 BinaryOperator::Or => Operator::Or,
407 }
408 }
409
410 fn infer_literal_scalar_value(&self, literal: &str, target_type: &DataType) -> ScalarValue {
413 let utf8_literal = ScalarValue::Utf8(Some(literal.to_string()));
414 utf8_literal.cast_to(target_type).unwrap_or(utf8_literal)
415 }
416
417 fn build_binary_expr(
420 &self,
421 left: &LogExpr,
422 op: &BinaryOperator,
423 right: &LogExpr,
424 schema: &DFSchema,
425 ) -> Result<Expr> {
426 let mut left_expr = self.log_expr_to_df_expr(left, schema)?;
428 let mut right_expr = self.log_expr_to_df_expr(right, schema)?;
429
430 match (left, right) {
432 (LogExpr::Literal(_), LogExpr::Literal(_)) => {
433 }
435 (LogExpr::Literal(literal), _) => {
436 if let Ok(right_type) = right_expr.get_type(schema) {
438 let inferred_scalar = self.infer_literal_scalar_value(literal, &right_type);
439 left_expr = lit(inferred_scalar);
440 }
441 }
442 (_, LogExpr::Literal(literal)) => {
443 if let Ok(left_type) = left_expr.get_type(schema) {
445 let inferred_scalar = self.infer_literal_scalar_value(literal, &left_type);
446 right_expr = lit(inferred_scalar);
447 }
448 }
449 _ => {
450 }
452 }
453
454 let df_op = Self::binary_operator_to_df_operator(op);
455 Ok(Expr::BinaryExpr(BinaryExpr {
456 left: Box::new(left_expr),
457 op: df_op,
458 right: Box::new(right_expr),
459 }))
460 }
461
462 fn create_inferred_literal(&self, value: &str, expr: &Expr, schema: &DFSchema) -> Expr {
465 if let Ok(expr_type) = expr.get_type(schema) {
466 lit(self.infer_literal_scalar_value(value, &expr_type))
467 } else {
468 lit(ScalarValue::Utf8(Some(value.to_string())))
469 }
470 }
471
472 fn create_eq_literal(value: EqualValue) -> Expr {
473 match value {
474 EqualValue::String(s) => lit(ScalarValue::Utf8(Some(s))),
475 EqualValue::Float(n) => lit(ScalarValue::Float64(Some(n))),
476 EqualValue::Int(n) => lit(ScalarValue::Int64(Some(n))),
477 EqualValue::Boolean(b) => lit(ScalarValue::Boolean(Some(b))),
478 EqualValue::UInt(n) => lit(ScalarValue::UInt64(Some(n))),
479 }
480 }
481
482 fn process_log_expr(
486 &self,
487 plan_builder: LogicalPlanBuilder,
488 expr: &LogExpr,
489 ) -> Result<LogicalPlanBuilder> {
490 let mut plan_builder = plan_builder;
491
492 match expr {
493 LogExpr::AggrFunc {
494 name,
495 args,
496 by,
497 range: _range,
498 alias,
499 } => {
500 let schema = plan_builder.schema();
501 let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?;
502 if let Some(alias) = alias {
503 aggr_expr = aggr_expr.alias(alias);
504 }
505
506 plan_builder = plan_builder
507 .aggregate(group_exprs, [aggr_expr.clone()])
508 .context(DataFusionPlanningSnafu)?;
509 }
510 LogExpr::Filter { filter } => {
511 let schema = plan_builder.schema();
512 if let Some(filter_expr) = self.build_column_filter(filter, schema.as_arrow())? {
513 plan_builder = plan_builder
514 .filter(filter_expr)
515 .context(DataFusionPlanningSnafu)?;
516 }
517 }
518 LogExpr::ScalarFunc { name, args, alias } => {
519 let schema = plan_builder.schema();
520 let expr = self.build_scalar_func(schema, name, args, alias)?;
521 plan_builder = plan_builder
522 .project([expr])
523 .context(DataFusionPlanningSnafu)?;
524 }
525 LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
526 }
528 LogExpr::Alias { expr, alias } => {
529 let schema = plan_builder.schema();
530 let df_expr = self.log_expr_to_df_expr(expr, schema)?;
531 let aliased_expr = df_expr.alias(alias);
532 plan_builder = plan_builder
533 .project([aliased_expr.clone()])
534 .context(DataFusionPlanningSnafu)?;
535 }
536 LogExpr::BinaryOp { .. } => {
537 let schema = plan_builder.schema();
538 let binary_expr = self.log_expr_to_df_expr(expr, schema)?;
539
540 plan_builder = plan_builder
541 .project([binary_expr])
542 .context(DataFusionPlanningSnafu)?;
543 }
544 _ => {
545 UnimplementedSnafu {
546 feature: "log expression",
547 }
548 .fail()?;
549 }
550 }
551 Ok(plan_builder)
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use std::sync::Arc;
558
559 use catalog::memory::MemoryCatalogManager;
560 use catalog::RegisterTableRequest;
561 use common_catalog::consts::DEFAULT_CATALOG_NAME;
562 use common_query::test_util::DummyDecoder;
563 use datafusion::execution::SessionStateBuilder;
564 use datatypes::prelude::ConcreteDataType;
565 use datatypes::schema::{ColumnSchema, SchemaRef};
566 use log_query::{
567 ColumnFilters, ConjunctionOperator, ContentFilter, Context, Filters, Limit, LogExpr,
568 };
569 use session::context::QueryContext;
570 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
571 use table::table_name::TableName;
572 use table::test_util::EmptyTable;
573
574 use super::*;
575
576 fn mock_schema() -> SchemaRef {
577 let columns = vec![
578 ColumnSchema::new(
579 "message".to_string(),
580 ConcreteDataType::string_datatype(),
581 false,
582 ),
583 ColumnSchema::new(
584 "timestamp".to_string(),
585 ConcreteDataType::timestamp_millisecond_datatype(),
586 false,
587 )
588 .with_time_index(true),
589 ColumnSchema::new(
590 "host".to_string(),
591 ConcreteDataType::string_datatype(),
592 true,
593 ),
594 ColumnSchema::new(
595 "is_active".to_string(),
596 ConcreteDataType::boolean_datatype(),
597 true,
598 ),
599 ];
600
601 Arc::new(Schema::new(columns))
602 }
603
604 fn mock_schema_with_typed_columns() -> SchemaRef {
605 let columns = vec![
606 ColumnSchema::new(
607 "message".to_string(),
608 ConcreteDataType::string_datatype(),
609 false,
610 ),
611 ColumnSchema::new(
612 "timestamp".to_string(),
613 ConcreteDataType::timestamp_millisecond_datatype(),
614 false,
615 )
616 .with_time_index(true),
617 ColumnSchema::new(
618 "host".to_string(),
619 ConcreteDataType::string_datatype(),
620 true,
621 ),
622 ColumnSchema::new(
623 "is_active".to_string(),
624 ConcreteDataType::boolean_datatype(),
625 true,
626 ),
627 ColumnSchema::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
629 ColumnSchema::new(
630 "score".to_string(),
631 ConcreteDataType::float64_datatype(),
632 true,
633 ),
634 ColumnSchema::new(
635 "count".to_string(),
636 ConcreteDataType::uint64_datatype(),
637 true,
638 ),
639 ];
640
641 Arc::new(Schema::new(columns))
642 }
643
644 async fn build_test_table_provider(
646 table_name_tuples: &[(String, String)],
647 ) -> DfTableSourceProvider {
648 build_test_table_provider_with_schema(table_name_tuples, mock_schema()).await
649 }
650
651 async fn build_test_table_provider_with_typed_columns(
653 table_name_tuples: &[(String, String)],
654 ) -> DfTableSourceProvider {
655 build_test_table_provider_with_schema(table_name_tuples, mock_schema_with_typed_columns())
656 .await
657 }
658
659 async fn build_test_table_provider_with_schema(
660 table_name_tuples: &[(String, String)],
661 schema: SchemaRef,
662 ) -> DfTableSourceProvider {
663 let catalog_list = MemoryCatalogManager::with_default_setup();
664 for (schema_name, table_name) in table_name_tuples {
665 let table_meta = TableMetaBuilder::empty()
666 .schema(schema.clone())
667 .primary_key_indices(vec![2])
668 .value_indices(vec![0])
669 .next_column_id(1024)
670 .build()
671 .unwrap();
672 let table_info = TableInfoBuilder::default()
673 .name(table_name.to_string())
674 .meta(table_meta)
675 .build()
676 .unwrap();
677 let table = EmptyTable::from_table_info(&table_info);
678
679 catalog_list
680 .register_table_sync(RegisterTableRequest {
681 catalog: DEFAULT_CATALOG_NAME.to_string(),
682 schema: schema_name.to_string(),
683 table_name: table_name.to_string(),
684 table_id: 1024,
685 table,
686 })
687 .unwrap();
688 }
689
690 DfTableSourceProvider::new(
691 catalog_list,
692 false,
693 QueryContext::arc(),
694 DummyDecoder::arc(),
695 false,
696 )
697 }
698
699 #[tokio::test]
700 async fn test_query_to_plan() {
701 let table_provider =
702 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
703 let session_state = SessionStateBuilder::new().with_default_features().build();
704 let mut planner = LogQueryPlanner::new(table_provider, session_state);
705
706 let log_query = LogQuery {
707 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
708 time_filter: TimeFilter {
709 start: Some("2021-01-01T00:00:00Z".to_string()),
710 end: Some("2021-01-02T00:00:00Z".to_string()),
711 span: None,
712 },
713 filters: Filters::Single(ColumnFilters {
714 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
715 filters: vec![ContentFilter::Contains("error".to_string())],
716 }),
717 limit: Limit {
718 skip: None,
719 fetch: Some(100),
720 },
721 context: Context::None,
722 columns: vec![],
723 exprs: vec![],
724 };
725
726 let plan = planner.query_to_plan(log_query).await.unwrap();
727 let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
728\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
729\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
730
731 assert_eq!(plan.display_indent_schema().to_string(), expected);
732 }
733
734 #[tokio::test]
735 async fn test_build_time_filter() {
736 let table_provider =
737 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
738 let session_state = SessionStateBuilder::new().with_default_features().build();
739 let planner = LogQueryPlanner::new(table_provider, session_state);
740
741 let time_filter = TimeFilter {
742 start: Some("2021-01-01T00:00:00Z".to_string()),
743 end: Some("2021-01-02T00:00:00Z".to_string()),
744 span: None,
745 };
746
747 let expr = planner
748 .build_time_filter(&time_filter, &mock_schema())
749 .unwrap();
750
751 let expected_expr = col("timestamp")
752 .gt_eq(lit(ScalarValue::Utf8(Some(
753 "2021-01-01T00:00:00Z".to_string(),
754 ))))
755 .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
756 "2021-01-02T00:00:00Z".to_string(),
757 )))));
758
759 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
760 }
761
762 #[tokio::test]
763 async fn test_build_time_filter_without_end() {
764 let table_provider =
765 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
766 let session_state = SessionStateBuilder::new().with_default_features().build();
767 let planner = LogQueryPlanner::new(table_provider, session_state);
768
769 let time_filter = TimeFilter {
770 start: Some("2021-01-01T00:00:00Z".to_string()),
771 end: None,
772 span: None,
773 };
774
775 let expr = planner
776 .build_time_filter(&time_filter, &mock_schema())
777 .unwrap();
778
779 let expected_expr = col("timestamp")
780 .gt_eq(lit(ScalarValue::Utf8(Some(
781 "2021-01-01T00:00:00Z".to_string(),
782 ))))
783 .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
784 "9999-12-31T23:59:59Z".to_string(),
785 )))));
786
787 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
788 }
789
790 #[tokio::test]
791 async fn test_build_content_filter() {
792 let table_provider =
793 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
794 let session_state = SessionStateBuilder::new().with_default_features().build();
795 let planner = LogQueryPlanner::new(table_provider, session_state);
796 let schema = mock_schema();
797
798 let column_filter = ColumnFilters {
799 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
800 filters: vec![
801 ContentFilter::Contains("error".to_string()),
802 ContentFilter::Prefix("WARN".to_string()),
803 ],
804 };
805
806 let expr_option = planner
807 .build_column_filter(&column_filter, schema.arrow_schema())
808 .unwrap();
809 assert!(expr_option.is_some());
810
811 let expr = expr_option.unwrap();
812
813 let expected_expr = col("message")
814 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
815 .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
816
817 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
818 }
819
820 #[tokio::test]
821 async fn test_query_to_plan_with_only_skip() {
822 let table_provider =
823 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
824 let session_state = SessionStateBuilder::new().with_default_features().build();
825 let mut planner = LogQueryPlanner::new(table_provider, session_state);
826
827 let log_query = LogQuery {
828 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
829 time_filter: TimeFilter {
830 start: Some("2021-01-01T00:00:00Z".to_string()),
831 end: Some("2021-01-02T00:00:00Z".to_string()),
832 span: None,
833 },
834 filters: Filters::Single(ColumnFilters {
835 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
836 filters: vec![ContentFilter::Contains("error".to_string())],
837 }),
838 limit: Limit {
839 skip: Some(10),
840 fetch: None,
841 },
842 context: Context::None,
843 columns: vec![],
844 exprs: vec![],
845 };
846
847 let plan = planner.query_to_plan(log_query).await.unwrap();
848 let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
849\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
850\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
851
852 assert_eq!(plan.display_indent_schema().to_string(), expected);
853 }
854
855 #[tokio::test]
856 async fn test_query_to_plan_without_limit() {
857 let table_provider =
858 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
859 let session_state = SessionStateBuilder::new().with_default_features().build();
860 let mut planner = LogQueryPlanner::new(table_provider, session_state);
861
862 let log_query = LogQuery {
863 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
864 time_filter: TimeFilter {
865 start: Some("2021-01-01T00:00:00Z".to_string()),
866 end: Some("2021-01-02T00:00:00Z".to_string()),
867 span: None,
868 },
869 filters: Filters::Single(ColumnFilters {
870 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
871 filters: vec![ContentFilter::Contains("error".to_string())],
872 }),
873 limit: Limit {
874 skip: None,
875 fetch: None,
876 },
877 context: Context::None,
878 columns: vec![],
879 exprs: vec![],
880 };
881
882 let plan = planner.query_to_plan(log_query).await.unwrap();
883 let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
884\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
885\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
886
887 assert_eq!(plan.display_indent_schema().to_string(), expected);
888 }
889
890 #[test]
891 fn test_escape_pattern() {
892 assert_eq!(escape_like_pattern("test"), "test");
893 assert_eq!(escape_like_pattern("te%st"), "te\\%st");
894 assert_eq!(escape_like_pattern("te_st"), "te\\_st");
895 assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
896 }
897
898 #[tokio::test]
899 async fn test_query_to_plan_with_aggr_func() {
900 let table_provider =
901 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
902 let session_state = SessionStateBuilder::new().with_default_features().build();
903 let mut planner = LogQueryPlanner::new(table_provider, session_state);
904
905 let log_query = LogQuery {
906 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
907 time_filter: TimeFilter {
908 start: Some("2021-01-01T00:00:00Z".to_string()),
909 end: Some("2021-01-02T00:00:00Z".to_string()),
910 span: None,
911 },
912 filters: Default::default(),
913 limit: Limit {
914 skip: None,
915 fetch: Some(100),
916 },
917 context: Context::None,
918 columns: vec![],
919 exprs: vec![LogExpr::AggrFunc {
920 name: "count".to_string(),
921 args: vec![LogExpr::NamedIdent("message".to_string())],
922 by: vec![LogExpr::NamedIdent("host".to_string())],
923 range: None,
924 alias: Some("count_result".to_string()),
925 }],
926 };
927
928 let plan = planner.query_to_plan(log_query).await.unwrap();
929 let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
930\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
931\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
932\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
933
934 assert_eq!(plan.display_indent_schema().to_string(), expected);
935 }
936
937 #[tokio::test]
938 async fn test_query_to_plan_with_scalar_func() {
939 let table_provider =
940 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
941 let session_state = SessionStateBuilder::new().with_default_features().build();
942 let mut planner = LogQueryPlanner::new(table_provider, session_state);
943
944 let log_query = LogQuery {
945 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
946 time_filter: TimeFilter {
947 start: Some("2021-01-01T00:00:00Z".to_string()),
948 end: Some("2021-01-02T00:00:00Z".to_string()),
949 span: None,
950 },
951 filters: Default::default(),
952 limit: Limit {
953 skip: None,
954 fetch: Some(100),
955 },
956 context: Context::None,
957 columns: vec![],
958 exprs: vec![LogExpr::ScalarFunc {
959 name: "date_trunc".to_string(),
960 args: vec![
961 LogExpr::NamedIdent("timestamp".to_string()),
962 LogExpr::Literal("day".to_string()),
963 ],
964 alias: Some("time_bucket".to_string()),
965 }],
966 };
967
968 let plan = planner.query_to_plan(log_query).await.unwrap();
969 let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
970 \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
971 \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
972 \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
973
974 assert_eq!(plan.display_indent_schema().to_string(), expected);
975 }
976
977 #[tokio::test]
978 async fn test_build_content_filter_between() {
979 let table_provider =
980 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
981 let session_state = SessionStateBuilder::new().with_default_features().build();
982 let planner = LogQueryPlanner::new(table_provider, session_state);
983 let schema = mock_schema();
984
985 let column_filter = ColumnFilters {
986 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
987 filters: vec![ContentFilter::Between {
988 start: "a".to_string(),
989 end: "z".to_string(),
990 start_inclusive: true,
991 end_inclusive: false,
992 }],
993 };
994
995 let expr_option = planner
996 .build_column_filter(&column_filter, schema.arrow_schema())
997 .unwrap();
998 assert!(expr_option.is_some());
999
1000 let expr = expr_option.unwrap();
1001 let expected_expr = col("message")
1002 .gt_eq(lit(ScalarValue::Utf8(Some("a".to_string()))))
1003 .and(col("message").lt(lit(ScalarValue::Utf8(Some("z".to_string())))));
1004
1005 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1006 }
1007
1008 #[tokio::test]
1009 async fn test_query_to_plan_with_date_histogram() {
1010 let table_provider =
1011 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1012 let session_state = SessionStateBuilder::new().with_default_features().build();
1013 let mut planner = LogQueryPlanner::new(table_provider, session_state);
1014
1015 let log_query = LogQuery {
1016 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
1017 time_filter: TimeFilter {
1018 start: Some("2021-01-01T00:00:00Z".to_string()),
1019 end: Some("2021-01-02T00:00:00Z".to_string()),
1020 span: None,
1021 },
1022 filters: Default::default(),
1023 limit: Limit {
1024 skip: Some(0),
1025 fetch: None,
1026 },
1027 context: Context::None,
1028 columns: vec![],
1029 exprs: vec![
1030 LogExpr::ScalarFunc {
1031 name: "date_bin".to_string(),
1032 args: vec![
1033 LogExpr::Literal("30 seconds".to_string()),
1034 LogExpr::NamedIdent("timestamp".to_string()),
1035 ],
1036 alias: Some("2__date_histogram__time_bucket".to_string()),
1037 },
1038 LogExpr::AggrFunc {
1039 name: "count".to_string(),
1040 args: vec![LogExpr::PositionalIdent(0)],
1041 by: vec![LogExpr::NamedIdent(
1042 "2__date_histogram__time_bucket".to_string(),
1043 )],
1044 range: None,
1045 alias: Some("count_result".to_string()),
1046 },
1047 ],
1048 };
1049
1050 let plan = planner.query_to_plan(log_query).await.unwrap();
1051 let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
1052\n Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
1053\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1054\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1055\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
1056
1057 assert_eq!(plan.display_indent_schema().to_string(), expected);
1058 }
1059
1060 #[tokio::test]
1061 async fn test_build_compound_filter() {
1062 let table_provider =
1063 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1064 let session_state = SessionStateBuilder::new().with_default_features().build();
1065 let planner = LogQueryPlanner::new(table_provider, session_state);
1066 let schema = mock_schema();
1067
1068 let column_filter = ColumnFilters {
1070 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1071 filters: vec![
1072 ContentFilter::Contains("error".to_string()),
1073 ContentFilter::Prefix("WARN".to_string()),
1074 ],
1075 };
1076 let expr = planner
1077 .build_column_filter(&column_filter, schema.arrow_schema())
1078 .unwrap()
1079 .unwrap();
1080
1081 let expected_expr = col("message")
1082 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1083 .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1084
1085 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1086
1087 let column_filter = ColumnFilters {
1089 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1090 filters: vec![ContentFilter::Compound(
1091 vec![
1092 ContentFilter::Contains("error".to_string()),
1093 ContentFilter::Prefix("WARN".to_string()),
1094 ],
1095 ConjunctionOperator::Or,
1096 )],
1097 };
1098 let expr = planner
1099 .build_column_filter(&column_filter, schema.arrow_schema())
1100 .unwrap()
1101 .unwrap();
1102
1103 let expected_expr = col("message")
1104 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1105 .or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1106
1107 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1108
1109 let column_filter = ColumnFilters {
1111 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1112 filters: vec![ContentFilter::Compound(
1113 vec![
1114 ContentFilter::Contains("error".to_string()),
1115 ContentFilter::Compound(
1116 vec![
1117 ContentFilter::Prefix("WARN".to_string()),
1118 ContentFilter::Exact("DEBUG".to_string()),
1119 ],
1120 ConjunctionOperator::Or,
1121 ),
1122 ],
1123 ConjunctionOperator::And,
1124 )],
1125 };
1126 let expr = planner
1127 .build_column_filter(&column_filter, schema.arrow_schema())
1128 .unwrap()
1129 .unwrap();
1130
1131 let expected_nested = col("message")
1132 .like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
1133 .or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string())))));
1134 let expected_expr = col("message")
1135 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1136 .and(expected_nested);
1137
1138 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1139 }
1140
1141 #[tokio::test]
1142 async fn test_build_great_than_filter() {
1143 let table_provider =
1144 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1145 let session_state = SessionStateBuilder::new().with_default_features().build();
1146 let planner = LogQueryPlanner::new(table_provider, session_state);
1147 let schema = mock_schema();
1148
1149 let column_filter = ColumnFilters {
1151 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1152 filters: vec![ContentFilter::GreatThan {
1153 value: "error".to_string(),
1154 inclusive: true,
1155 }],
1156 };
1157
1158 let expr_option = planner
1159 .build_column_filter(&column_filter, schema.arrow_schema())
1160 .unwrap();
1161 assert!(expr_option.is_some());
1162
1163 let expr = expr_option.unwrap();
1164 let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1165
1166 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1167
1168 let column_filter = ColumnFilters {
1170 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1171 filters: vec![ContentFilter::GreatThan {
1172 value: "error".to_string(),
1173 inclusive: false,
1174 }],
1175 };
1176
1177 let expr_option = planner
1178 .build_column_filter(&column_filter, schema.arrow_schema())
1179 .unwrap();
1180 assert!(expr_option.is_some());
1181
1182 let expr = expr_option.unwrap();
1183 let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1184
1185 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1186 }
1187
1188 #[tokio::test]
1189 async fn test_build_less_than_filter() {
1190 let table_provider =
1191 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1192 let session_state = SessionStateBuilder::new().with_default_features().build();
1193 let planner = LogQueryPlanner::new(table_provider, session_state);
1194 let schema = mock_schema();
1195
1196 let column_filter = ColumnFilters {
1198 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1199 filters: vec![ContentFilter::LessThan {
1200 value: "error".to_string(),
1201 inclusive: true,
1202 }],
1203 };
1204
1205 let expr_option = planner
1206 .build_column_filter(&column_filter, schema.arrow_schema())
1207 .unwrap();
1208 assert!(expr_option.is_some());
1209
1210 let expr = expr_option.unwrap();
1211 let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1212
1213 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1214
1215 let column_filter = ColumnFilters {
1217 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1218 filters: vec![ContentFilter::LessThan {
1219 value: "error".to_string(),
1220 inclusive: false,
1221 }],
1222 };
1223
1224 let expr_option = planner
1225 .build_column_filter(&column_filter, schema.arrow_schema())
1226 .unwrap();
1227 assert!(expr_option.is_some());
1228
1229 let expr = expr_option.unwrap();
1230 let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1231
1232 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1233 }
1234
1235 #[tokio::test]
1236 async fn test_build_in_filter() {
1237 let table_provider =
1238 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1239 let session_state = SessionStateBuilder::new().with_default_features().build();
1240 let planner = LogQueryPlanner::new(table_provider, session_state);
1241 let schema = mock_schema();
1242
1243 let column_filter = ColumnFilters {
1245 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1246 filters: vec![ContentFilter::In(vec![
1247 "error".to_string(),
1248 "warning".to_string(),
1249 "info".to_string(),
1250 ])],
1251 };
1252
1253 let expr_option = planner
1254 .build_column_filter(&column_filter, schema.arrow_schema())
1255 .unwrap();
1256 assert!(expr_option.is_some());
1257
1258 let expr = expr_option.unwrap();
1259 let expected_expr = col("message").in_list(
1260 vec![
1261 lit(ScalarValue::Utf8(Some("error".to_string()))),
1262 lit(ScalarValue::Utf8(Some("warning".to_string()))),
1263 lit(ScalarValue::Utf8(Some("info".to_string()))),
1264 ],
1265 false,
1266 );
1267
1268 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1269 }
1270
1271 #[tokio::test]
1272 async fn test_build_is_true_filter() {
1273 let table_provider =
1274 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1275 let session_state = SessionStateBuilder::new().with_default_features().build();
1276 let planner = LogQueryPlanner::new(table_provider, session_state);
1277 let schema = mock_schema();
1278
1279 let column_filter = ColumnFilters {
1281 expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1282 filters: vec![ContentFilter::IsTrue],
1283 };
1284
1285 let expr_option = planner
1286 .build_column_filter(&column_filter, schema.arrow_schema())
1287 .unwrap();
1288 assert!(expr_option.is_some());
1289
1290 let expr = expr_option.unwrap();
1291 let expected_expr_string =
1292 "IsTrue(Column(Column { relation: None, name: \"is_active\" }))".to_string();
1293
1294 assert_eq!(format!("{:?}", expr), expected_expr_string);
1295 }
1296
1297 #[tokio::test]
1298 async fn test_build_filter_with_scalar_fn() {
1299 let table_provider =
1300 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1301 let session_state = SessionStateBuilder::new().with_default_features().build();
1302 let planner = LogQueryPlanner::new(table_provider, session_state);
1303 let schema = mock_schema();
1304
1305 let column_filter = ColumnFilters {
1306 expr: Box::new(LogExpr::BinaryOp {
1307 left: Box::new(LogExpr::ScalarFunc {
1308 name: "character_length".to_string(),
1309 args: vec![LogExpr::NamedIdent("message".to_string())],
1310 alias: None,
1311 }),
1312 op: BinaryOperator::Gt,
1313 right: Box::new(LogExpr::Literal("100".to_string())),
1314 }),
1315 filters: vec![ContentFilter::IsTrue],
1316 };
1317
1318 let expr_option = planner
1319 .build_column_filter(&column_filter, schema.arrow_schema())
1320 .unwrap();
1321 assert!(expr_option.is_some());
1322
1323 let expr = expr_option.unwrap();
1324 let expected_expr_string = "character_length(message) > Int32(100) IS TRUE";
1325
1326 assert_eq!(format!("{}", expr), expected_expr_string);
1327 }
1328
1329 #[tokio::test]
1330 async fn test_type_inference_float_comparison() {
1331 let table_provider = build_test_table_provider_with_typed_columns(&[(
1332 "public".to_string(),
1333 "test_table".to_string(),
1334 )])
1335 .await;
1336 let session_state = SessionStateBuilder::new().with_default_features().build();
1337 let planner = LogQueryPlanner::new(table_provider, session_state);
1338 let schema = mock_schema_with_typed_columns();
1339
1340 let column_filter = ColumnFilters {
1342 expr: Box::new(LogExpr::NamedIdent("score".to_string())),
1343 filters: vec![ContentFilter::Between {
1344 start: "75.5".to_string(),
1345 end: "100.0".to_string(),
1346 start_inclusive: true,
1347 end_inclusive: false,
1348 }],
1349 };
1350
1351 let expr_option = planner
1352 .build_column_filter(&column_filter, schema.arrow_schema())
1353 .unwrap();
1354 assert!(expr_option.is_some());
1355
1356 let expr = expr_option.unwrap();
1357 let expected_expr = col("score")
1359 .gt_eq(lit(ScalarValue::Float64(Some(75.5))))
1360 .and(col("score").lt(lit(ScalarValue::Float64(Some(100.0)))));
1361
1362 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1363 }
1364
1365 #[tokio::test]
1366 async fn test_type_inference_boolean_comparison() {
1367 let table_provider = build_test_table_provider_with_typed_columns(&[(
1368 "public".to_string(),
1369 "test_table".to_string(),
1370 )])
1371 .await;
1372 let session_state = SessionStateBuilder::new().with_default_features().build();
1373 let planner = LogQueryPlanner::new(table_provider, session_state);
1374 let schema = mock_schema_with_typed_columns();
1375
1376 let column_filter = ColumnFilters {
1378 expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1379 filters: vec![ContentFilter::In(vec![
1380 "true".to_string(),
1381 "1".to_string(),
1382 "false".to_string(),
1383 ])],
1384 };
1385
1386 let expr_option = planner
1387 .build_column_filter(&column_filter, schema.arrow_schema())
1388 .unwrap();
1389 assert!(expr_option.is_some());
1390
1391 let expr = expr_option.unwrap();
1392 let expected_expr = col("is_active").in_list(
1394 vec![
1395 lit(ScalarValue::Boolean(Some(true))),
1396 lit(ScalarValue::Boolean(Some(true))),
1397 lit(ScalarValue::Boolean(Some(false))),
1398 ],
1399 false,
1400 );
1401
1402 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1403 }
1404
1405 #[tokio::test]
1406 async fn test_fallback_to_utf8_on_parse_failure() {
1407 let table_provider = build_test_table_provider_with_typed_columns(&[(
1408 "public".to_string(),
1409 "test_table".to_string(),
1410 )])
1411 .await;
1412 let session_state = SessionStateBuilder::new().with_default_features().build();
1413 let planner = LogQueryPlanner::new(table_provider, session_state);
1414 let schema = mock_schema_with_typed_columns();
1415
1416 let column_filter = ColumnFilters {
1418 expr: Box::new(LogExpr::NamedIdent("age".to_string())),
1419 filters: vec![ContentFilter::GreatThan {
1420 value: "not_a_number".to_string(),
1421 inclusive: false,
1422 }],
1423 };
1424
1425 let expr_option = planner
1426 .build_column_filter(&column_filter, schema.arrow_schema())
1427 .unwrap();
1428 assert!(expr_option.is_some());
1429
1430 let expr = expr_option.unwrap();
1431 let expected_expr = col("age").gt(lit(ScalarValue::Utf8(Some("not_a_number".to_string()))));
1433
1434 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1435 }
1436
1437 #[tokio::test]
1438 async fn test_string_column_remains_utf8() {
1439 let table_provider = build_test_table_provider_with_typed_columns(&[(
1440 "public".to_string(),
1441 "test_table".to_string(),
1442 )])
1443 .await;
1444 let session_state = SessionStateBuilder::new().with_default_features().build();
1445 let planner = LogQueryPlanner::new(table_provider, session_state);
1446 let schema = mock_schema_with_typed_columns();
1447
1448 let column_filter = ColumnFilters {
1450 expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1451 filters: vec![ContentFilter::GreatThan {
1452 value: "123".to_string(),
1453 inclusive: false,
1454 }],
1455 };
1456
1457 let expr_option = planner
1458 .build_column_filter(&column_filter, schema.arrow_schema())
1459 .unwrap();
1460 assert!(expr_option.is_some());
1461
1462 let expr = expr_option.unwrap();
1463 let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("123".to_string()))));
1465
1466 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1467 }
1468
1469 #[tokio::test]
1470 async fn test_all_binary_operators() {
1471 let table_provider = build_test_table_provider_with_typed_columns(&[(
1472 "public".to_string(),
1473 "test_table".to_string(),
1474 )])
1475 .await;
1476 let session_state = SessionStateBuilder::new().with_default_features().build();
1477 let planner = LogQueryPlanner::new(table_provider, session_state);
1478 let schema = mock_schema_with_typed_columns();
1479
1480 let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1481
1482 let test_cases = vec![
1484 (BinaryOperator::Eq, Operator::Eq),
1485 (BinaryOperator::Ne, Operator::NotEq),
1486 (BinaryOperator::Lt, Operator::Lt),
1487 (BinaryOperator::Le, Operator::LtEq),
1488 (BinaryOperator::Gt, Operator::Gt),
1489 (BinaryOperator::Ge, Operator::GtEq),
1490 (BinaryOperator::Plus, Operator::Plus),
1491 (BinaryOperator::Minus, Operator::Minus),
1492 (BinaryOperator::Multiply, Operator::Multiply),
1493 (BinaryOperator::Divide, Operator::Divide),
1494 (BinaryOperator::Modulo, Operator::Modulo),
1495 (BinaryOperator::And, Operator::And),
1496 (BinaryOperator::Or, Operator::Or),
1497 ];
1498
1499 for (binary_op, expected_df_op) in test_cases {
1500 let binary_expr = LogExpr::BinaryOp {
1501 left: Box::new(LogExpr::NamedIdent("age".to_string())),
1502 op: binary_op,
1503 right: Box::new(LogExpr::Literal("25".to_string())),
1504 };
1505
1506 let expr = planner
1507 .log_expr_to_df_expr(&binary_expr, &df_schema)
1508 .unwrap();
1509
1510 let expected_expr = Expr::BinaryExpr(BinaryExpr {
1511 left: Box::new(col("age")),
1512 op: expected_df_op,
1513 right: Box::new(lit(ScalarValue::Int32(Some(25)))),
1514 });
1515
1516 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1517 }
1518 }
1519
1520 #[tokio::test]
1521 async fn test_nested_binary_operations() {
1522 let table_provider = build_test_table_provider_with_typed_columns(&[(
1523 "public".to_string(),
1524 "test_table".to_string(),
1525 )])
1526 .await;
1527 let session_state = SessionStateBuilder::new().with_default_features().build();
1528 let planner = LogQueryPlanner::new(table_provider, session_state);
1529 let schema = mock_schema_with_typed_columns();
1530
1531 let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1532
1533 let nested_binary_expr = LogExpr::BinaryOp {
1535 left: Box::new(LogExpr::BinaryOp {
1536 left: Box::new(LogExpr::NamedIdent("age".to_string())),
1537 op: BinaryOperator::Plus,
1538 right: Box::new(LogExpr::Literal("5".to_string())),
1539 }),
1540 op: BinaryOperator::Gt,
1541 right: Box::new(LogExpr::Literal("30".to_string())),
1542 };
1543
1544 let expr = planner
1545 .log_expr_to_df_expr(&nested_binary_expr, &df_schema)
1546 .unwrap();
1547
1548 let expected_expr_debug = r#"BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "age" }), op: Plus, right: Literal(Int32(5), None) }), op: Gt, right: Literal(Int32(30), None) })"#;
1550 assert_eq!(format!("{:?}", expr), expected_expr_debug);
1551 }
1552}