query/log_query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow_schema::{DataType, Schema as ArrowSchema};
16use catalog::table_source::DfTableSourceProvider;
17use common_function::utils::escape_like_pattern;
18use datafusion::datasource::DefaultTableSource;
19use datafusion::execution::SessionState;
20use datafusion_common::{DFSchema, ScalarValue};
21use datafusion_expr::utils::{conjunction, disjunction};
22use datafusion_expr::{
23    BinaryExpr, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, Operator, col, lit, not,
24};
25use datafusion_sql::TableReference;
26use datatypes::schema::Schema;
27use log_query::{AggFunc, BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
28use snafu::{OptionExt, ResultExt};
29use table::table::adapter::DfTableProviderAdapter;
30
31use crate::log_query::error::{
32    CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
33    UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
34    UnknownTableSnafu,
35};
36
37const DEFAULT_LIMIT: usize = 1000;
38
39pub struct LogQueryPlanner {
40    table_provider: DfTableSourceProvider,
41    session_state: SessionState,
42}
43
44impl LogQueryPlanner {
45    pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
46        Self {
47            table_provider,
48            session_state,
49        }
50    }
51
52    pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
53        // Resolve table
54        let table_ref: TableReference = query.table.table_ref().into();
55        let table_source = self
56            .table_provider
57            .resolve_table(table_ref.clone())
58            .await
59            .context(CatalogSnafu)?;
60        let schema = table_source
61            .as_any()
62            .downcast_ref::<DefaultTableSource>()
63            .context(UnknownTableSnafu)?
64            .table_provider
65            .as_any()
66            .downcast_ref::<DfTableProviderAdapter>()
67            .context(UnknownTableSnafu)?
68            .table()
69            .schema();
70
71        // Build the initial scan plan
72        let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
73            .context(DataFusionPlanningSnafu)?;
74        let df_schema = plan_builder.schema().clone();
75
76        // Collect filter expressions
77        let mut filters = Vec::new();
78
79        // Time filter
80        filters.push(self.build_time_filter(&query.time_filter, &schema)?);
81
82        if let Some(filters_expr) = self.build_filters(&query.filters, df_schema.as_arrow())? {
83            filters.push(filters_expr);
84        }
85
86        // Apply filters
87        if !filters.is_empty() {
88            let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap();
89            plan_builder = plan_builder
90                .filter(filter_expr)
91                .context(DataFusionPlanningSnafu)?;
92        }
93
94        // Apply projections
95        if !query.columns.is_empty() {
96            let projected_columns = query.columns.iter().map(col).collect::<Vec<_>>();
97            plan_builder = plan_builder
98                .project(projected_columns)
99                .context(DataFusionPlanningSnafu)?;
100        }
101
102        // Apply limit
103        plan_builder = plan_builder
104            .limit(
105                query.limit.skip.unwrap_or(0),
106                Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
107            )
108            .context(DataFusionPlanningSnafu)?;
109
110        // Apply log expressions
111        for expr in &query.exprs {
112            plan_builder = self.process_log_expr(plan_builder, expr)?;
113        }
114
115        // Build the final plan
116        let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
117
118        Ok(plan)
119    }
120
121    fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result<Expr> {
122        let timestamp_col = schema
123            .timestamp_column()
124            .with_context(|| TimeIndexNotFoundSnafu {})?
125            .name
126            .clone();
127
128        let start_time = ScalarValue::Utf8(time_filter.start.clone());
129        let end_time = ScalarValue::Utf8(
130            time_filter
131                .end
132                .clone()
133                .or(Some("9999-12-31T23:59:59Z".to_string())),
134        );
135        let expr = col(timestamp_col.clone())
136            .gt_eq(lit(start_time))
137            .and(col(timestamp_col).lt_eq(lit(end_time)));
138
139        Ok(expr)
140    }
141    //disjunction
142    fn build_filters(
143        &self,
144        filters: &log_query::Filters,
145        schema: &ArrowSchema,
146    ) -> Result<Option<Expr>> {
147        match filters {
148            log_query::Filters::And(filters) => {
149                let exprs = filters
150                    .iter()
151                    .filter_map(|filter| self.build_filters(filter, schema).transpose())
152                    .try_collect::<Vec<_>>()?;
153                if exprs.is_empty() {
154                    Ok(None)
155                } else {
156                    Ok(conjunction(exprs))
157                }
158            }
159            log_query::Filters::Or(filters) => {
160                let exprs = filters
161                    .iter()
162                    .filter_map(|filter| self.build_filters(filter, schema).transpose())
163                    .try_collect::<Vec<_>>()?;
164                if exprs.is_empty() {
165                    Ok(None)
166                } else {
167                    Ok(disjunction(exprs))
168                }
169            }
170            log_query::Filters::Not(filter) => {
171                if let Some(expr) = self.build_filters(filter, schema)? {
172                    Ok(Some(not(expr)))
173                } else {
174                    Ok(None)
175                }
176            }
177            log_query::Filters::Single(column_filters) => {
178                // Build a single column filter
179                self.build_column_filter(column_filters, schema)
180            }
181        }
182    }
183
184    /// Builds filter expression from ColumnFilters (new structure with expr + filters)
185    fn build_column_filter(
186        &self,
187        column_filter: &log_query::ColumnFilters,
188        schema: &ArrowSchema,
189    ) -> Result<Option<Expr>> {
190        // Convert ArrowSchema to DFSchema for the more generic function
191        let df_schema = DFSchema::try_from(schema.clone()).context(DataFusionPlanningSnafu)?;
192        let col_expr = self.log_expr_to_df_expr(&column_filter.expr, &df_schema)?;
193
194        let filter_exprs = column_filter
195            .filters
196            .iter()
197            .filter_map(|filter| {
198                self.build_content_filter_with_expr(col_expr.clone(), filter, &df_schema)
199                    .transpose()
200            })
201            .try_collect::<Vec<_>>()?;
202
203        if filter_exprs.is_empty() {
204            return Ok(Some(col_expr.is_true()));
205        }
206
207        // Combine all filters with AND logic
208        Ok(conjunction(filter_exprs))
209    }
210
211    /// Builds filter expression from a single ContentFilter using a provided column expression
212    #[allow(clippy::only_used_in_recursion)]
213    fn build_content_filter_with_expr(
214        &self,
215        col_expr: Expr,
216        filter: &log_query::ContentFilter,
217        schema: &DFSchema,
218    ) -> Result<Option<Expr>> {
219        match filter {
220            log_query::ContentFilter::Exact(value) => Ok(Some(
221                col_expr.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))),
222            )),
223            log_query::ContentFilter::Prefix(value) => Ok(Some(col_expr.like(lit(
224                ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(value)))),
225            )))),
226            log_query::ContentFilter::Postfix(value) => Ok(Some(col_expr.like(lit(
227                ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(value)))),
228            )))),
229            log_query::ContentFilter::Contains(value) => Ok(Some(col_expr.like(lit(
230                ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(value)))),
231            )))),
232            log_query::ContentFilter::Regex(_pattern) => Err(UnimplementedSnafu {
233                feature: "regex filter",
234            }
235            .build()),
236            log_query::ContentFilter::Exist => Ok(Some(col_expr.is_not_null())),
237            log_query::ContentFilter::Between {
238                start,
239                end,
240                start_inclusive,
241                end_inclusive,
242            } => {
243                let start_literal = self.create_inferred_literal(start, &col_expr, schema);
244                let end_literal = self.create_inferred_literal(end, &col_expr, schema);
245
246                let left = if *start_inclusive {
247                    col_expr.clone().gt_eq(start_literal)
248                } else {
249                    col_expr.clone().gt(start_literal)
250                };
251                let right = if *end_inclusive {
252                    col_expr.lt_eq(end_literal)
253                } else {
254                    col_expr.lt(end_literal)
255                };
256                Ok(Some(left.and(right)))
257            }
258            log_query::ContentFilter::GreatThan { value, inclusive } => {
259                let value_literal = self.create_inferred_literal(value, &col_expr, schema);
260                let comparison_expr = if *inclusive {
261                    col_expr.gt_eq(value_literal)
262                } else {
263                    col_expr.gt(value_literal)
264                };
265                Ok(Some(comparison_expr))
266            }
267            log_query::ContentFilter::LessThan { value, inclusive } => {
268                let value_literal = self.create_inferred_literal(value, &col_expr, schema);
269                if *inclusive {
270                    Ok(Some(col_expr.lt_eq(value_literal)))
271                } else {
272                    Ok(Some(col_expr.lt(value_literal)))
273                }
274            }
275            log_query::ContentFilter::In(values) => {
276                let inferred_values: Vec<_> = values
277                    .iter()
278                    .map(|v| self.create_inferred_literal(v, &col_expr, schema))
279                    .collect();
280                Ok(Some(col_expr.in_list(inferred_values, false)))
281            }
282            log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())),
283            log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())),
284            log_query::ContentFilter::Equal(value) => {
285                let value_literal = Self::create_eq_literal(value.clone());
286                Ok(Some(col_expr.eq(value_literal)))
287            }
288            log_query::ContentFilter::Compound(filters, op) => {
289                let exprs = filters
290                    .iter()
291                    .filter_map(|filter| {
292                        self.build_content_filter_with_expr(col_expr.clone(), filter, schema)
293                            .transpose()
294                    })
295                    .try_collect::<Vec<_>>()?;
296
297                if exprs.is_empty() {
298                    return Ok(None);
299                }
300
301                match op {
302                    log_query::ConjunctionOperator::And => Ok(conjunction(exprs)),
303                    log_query::ConjunctionOperator::Or => {
304                        // Build a disjunction (OR) of expressions
305                        Ok(exprs.into_iter().reduce(|a, b| a.or(b)))
306                    }
307                }
308            }
309        }
310    }
311
312    fn build_aggr_func(
313        &self,
314        schema: &DFSchema,
315        expr: &[AggFunc],
316        by: &[LogExpr],
317    ) -> Result<(Vec<Expr>, Vec<Expr>)> {
318        let aggr_expr = expr
319            .iter()
320            .map(|agg_func| {
321                let AggFunc {
322                    name: fn_name,
323                    args,
324                    alias,
325                } = agg_func;
326                let aggr_fn = self
327                    .session_state
328                    .aggregate_functions()
329                    .get(fn_name)
330                    .context(UnknownAggregateFunctionSnafu {
331                        name: fn_name.to_string(),
332                    })?;
333                let args = args
334                    .iter()
335                    .map(|expr| self.log_expr_to_df_expr(expr, schema))
336                    .try_collect::<Vec<_>>()?;
337                if let Some(alias) = alias {
338                    Ok(aggr_fn.call(args).alias(alias))
339                } else {
340                    Ok(aggr_fn.call(args))
341                }
342            })
343            .try_collect::<Vec<_>>()?;
344
345        let group_exprs = by
346            .iter()
347            .map(|expr| self.log_expr_to_df_expr(expr, schema))
348            .try_collect::<Vec<_>>()?;
349
350        Ok((aggr_expr, group_exprs))
351    }
352
353    /// Converts a LogExpr to a DataFusion Expr, handling all expression types.
354    fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
355        match expr {
356            LogExpr::NamedIdent(name) => Ok(col(name)),
357            LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
358            LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
359            LogExpr::BinaryOp { left, op, right } => {
360                // For binary operations, always use type inference (matches original behavior)
361                self.build_binary_expr(left, op, right, schema)
362            }
363            LogExpr::ScalarFunc { name, args, alias } => {
364                self.build_scalar_func(schema, name, args, alias)
365            }
366            LogExpr::Alias { expr, alias } => {
367                let df_expr = self.log_expr_to_df_expr(expr, schema)?;
368                Ok(df_expr.alias(alias))
369            }
370            LogExpr::AggrFunc { .. } | LogExpr::Filter { .. } | LogExpr::Decompose { .. } => {
371                UnexpectedLogExprSnafu {
372                    expr: expr.clone(),
373                    expected: "not a typical expression",
374                }
375                .fail()
376            }
377        }
378    }
379
380    fn build_scalar_func(
381        &self,
382        schema: &DFSchema,
383        name: &str,
384        args: &[LogExpr],
385        alias: &Option<String>,
386    ) -> Result<Expr> {
387        let args = args
388            .iter()
389            .map(|expr| self.log_expr_to_df_expr(expr, schema))
390            .try_collect::<Vec<_>>()?;
391        let func = self.session_state.scalar_functions().get(name).context(
392            UnknownScalarFunctionSnafu {
393                name: name.to_string(),
394            },
395        )?;
396        let expr = func.call(args);
397
398        if let Some(alias) = alias {
399            Ok(expr.alias(alias))
400        } else {
401            Ok(expr)
402        }
403    }
404
405    /// Convert BinaryOperator to DataFusion's Operator.
406    fn binary_operator_to_df_operator(op: &BinaryOperator) -> Operator {
407        match op {
408            BinaryOperator::Eq => Operator::Eq,
409            BinaryOperator::Ne => Operator::NotEq,
410            BinaryOperator::Lt => Operator::Lt,
411            BinaryOperator::Le => Operator::LtEq,
412            BinaryOperator::Gt => Operator::Gt,
413            BinaryOperator::Ge => Operator::GtEq,
414            BinaryOperator::Plus => Operator::Plus,
415            BinaryOperator::Minus => Operator::Minus,
416            BinaryOperator::Multiply => Operator::Multiply,
417            BinaryOperator::Divide => Operator::Divide,
418            BinaryOperator::Modulo => Operator::Modulo,
419            BinaryOperator::And => Operator::And,
420            BinaryOperator::Or => Operator::Or,
421        }
422    }
423
424    /// Parse a string literal to the appropriate ScalarValue based on target DataType.
425    /// Falls back to UTF8 if parsing fails or type is not supported.
426    fn infer_literal_scalar_value(&self, literal: &str, target_type: &DataType) -> ScalarValue {
427        let utf8_literal = ScalarValue::Utf8(Some(literal.to_string()));
428        utf8_literal.cast_to(target_type).unwrap_or(utf8_literal)
429    }
430
431    /// Build binary expression with type inference for literals.
432    /// Attempts to infer literal types from the non-literal operand's type.
433    fn build_binary_expr(
434        &self,
435        left: &LogExpr,
436        op: &BinaryOperator,
437        right: &LogExpr,
438        schema: &DFSchema,
439    ) -> Result<Expr> {
440        // Convert both sides to DataFusion expressions first
441        let mut left_expr = self.log_expr_to_df_expr(left, schema)?;
442        let mut right_expr = self.log_expr_to_df_expr(right, schema)?;
443
444        // Try to infer literal types based on the other operand
445        match (left, right) {
446            (LogExpr::Literal(_), LogExpr::Literal(_)) => {
447                // both are literal, do nothing
448            }
449            (LogExpr::Literal(literal), _) => {
450                // Left is literal, try to infer from right
451                if let Ok(right_type) = right_expr.get_type(schema) {
452                    let inferred_scalar = self.infer_literal_scalar_value(literal, &right_type);
453                    left_expr = lit(inferred_scalar);
454                }
455            }
456            (_, LogExpr::Literal(literal)) => {
457                // Right is literal, try to infer from left
458                if let Ok(left_type) = left_expr.get_type(schema) {
459                    let inferred_scalar = self.infer_literal_scalar_value(literal, &left_type);
460                    right_expr = lit(inferred_scalar);
461                }
462            }
463            _ => {
464                // Neither is a simple literal, no type inference needed
465            }
466        }
467
468        let df_op = Self::binary_operator_to_df_operator(op);
469        Ok(Expr::BinaryExpr(BinaryExpr {
470            left: Box::new(left_expr),
471            op: df_op,
472            right: Box::new(right_expr),
473        }))
474    }
475
476    /// Create a type-inferred literal based on the provided expression's type.
477    /// Falls back to UTF8 if type inference fails.
478    fn create_inferred_literal(&self, value: &str, expr: &Expr, schema: &DFSchema) -> Expr {
479        if let Ok(expr_type) = expr.get_type(schema) {
480            lit(self.infer_literal_scalar_value(value, &expr_type))
481        } else {
482            lit(ScalarValue::Utf8(Some(value.to_string())))
483        }
484    }
485
486    fn create_eq_literal(value: EqualValue) -> Expr {
487        match value {
488            EqualValue::String(s) => lit(ScalarValue::Utf8(Some(s))),
489            EqualValue::Float(n) => lit(ScalarValue::Float64(Some(n))),
490            EqualValue::Int(n) => lit(ScalarValue::Int64(Some(n))),
491            EqualValue::Boolean(b) => lit(ScalarValue::Boolean(Some(b))),
492            EqualValue::UInt(n) => lit(ScalarValue::UInt64(Some(n))),
493        }
494    }
495
496    /// Process LogExpr recursively.
497    ///
498    /// Return the [`LogicalPlanBuilder`] after modification and the resulting expression's names.
499    fn process_log_expr(
500        &self,
501        plan_builder: LogicalPlanBuilder,
502        expr: &LogExpr,
503    ) -> Result<LogicalPlanBuilder> {
504        let mut plan_builder = plan_builder;
505
506        match expr {
507            LogExpr::AggrFunc { expr, by } => {
508                let schema = plan_builder.schema();
509                let (aggr_expr, group_exprs) = self.build_aggr_func(schema, expr, by)?;
510
511                plan_builder = plan_builder
512                    .aggregate(group_exprs, aggr_expr)
513                    .context(DataFusionPlanningSnafu)?;
514            }
515            LogExpr::Filter { filter } => {
516                let schema = plan_builder.schema();
517                if let Some(filter_expr) = self.build_column_filter(filter, schema.as_arrow())? {
518                    plan_builder = plan_builder
519                        .filter(filter_expr)
520                        .context(DataFusionPlanningSnafu)?;
521                }
522            }
523            LogExpr::ScalarFunc { name, args, alias } => {
524                let schema = plan_builder.schema();
525                let expr = self.build_scalar_func(schema, name, args, alias)?;
526                plan_builder = plan_builder
527                    .project([expr])
528                    .context(DataFusionPlanningSnafu)?;
529            }
530            LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
531                // nothing to do, return empty vec.
532            }
533            LogExpr::Alias { expr, alias } => {
534                let schema = plan_builder.schema();
535                let df_expr = self.log_expr_to_df_expr(expr, schema)?;
536                let aliased_expr = df_expr.alias(alias);
537                plan_builder = plan_builder
538                    .project([aliased_expr.clone()])
539                    .context(DataFusionPlanningSnafu)?;
540            }
541            LogExpr::BinaryOp { .. } => {
542                let schema = plan_builder.schema();
543                let binary_expr = self.log_expr_to_df_expr(expr, schema)?;
544
545                plan_builder = plan_builder
546                    .project([binary_expr])
547                    .context(DataFusionPlanningSnafu)?;
548            }
549            _ => {
550                UnimplementedSnafu {
551                    feature: "log expression",
552                }
553                .fail()?;
554            }
555        }
556        Ok(plan_builder)
557    }
558}
559
560#[cfg(test)]
561mod tests {
562    use std::sync::Arc;
563
564    use catalog::RegisterTableRequest;
565    use catalog::memory::MemoryCatalogManager;
566    use common_catalog::consts::DEFAULT_CATALOG_NAME;
567    use common_query::test_util::DummyDecoder;
568    use datafusion::execution::SessionStateBuilder;
569    use datatypes::prelude::ConcreteDataType;
570    use datatypes::schema::{ColumnSchema, SchemaRef};
571    use log_query::{
572        ColumnFilters, ConjunctionOperator, ContentFilter, Context, Filters, Limit, LogExpr,
573    };
574    use session::context::QueryContext;
575    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
576    use table::table_name::TableName;
577    use table::test_util::EmptyTable;
578
579    use super::*;
580
581    fn mock_schema() -> SchemaRef {
582        let columns = vec![
583            ColumnSchema::new(
584                "message".to_string(),
585                ConcreteDataType::string_datatype(),
586                false,
587            ),
588            ColumnSchema::new(
589                "timestamp".to_string(),
590                ConcreteDataType::timestamp_millisecond_datatype(),
591                false,
592            )
593            .with_time_index(true),
594            ColumnSchema::new(
595                "host".to_string(),
596                ConcreteDataType::string_datatype(),
597                true,
598            ),
599            ColumnSchema::new(
600                "is_active".to_string(),
601                ConcreteDataType::boolean_datatype(),
602                true,
603            ),
604        ];
605
606        Arc::new(Schema::new(columns))
607    }
608
609    fn mock_schema_with_typed_columns() -> SchemaRef {
610        let columns = vec![
611            ColumnSchema::new(
612                "message".to_string(),
613                ConcreteDataType::string_datatype(),
614                false,
615            ),
616            ColumnSchema::new(
617                "timestamp".to_string(),
618                ConcreteDataType::timestamp_millisecond_datatype(),
619                false,
620            )
621            .with_time_index(true),
622            ColumnSchema::new(
623                "host".to_string(),
624                ConcreteDataType::string_datatype(),
625                true,
626            ),
627            ColumnSchema::new(
628                "is_active".to_string(),
629                ConcreteDataType::boolean_datatype(),
630                true,
631            ),
632            // Add more typed columns for comprehensive testing
633            ColumnSchema::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
634            ColumnSchema::new(
635                "score".to_string(),
636                ConcreteDataType::float64_datatype(),
637                true,
638            ),
639            ColumnSchema::new(
640                "count".to_string(),
641                ConcreteDataType::uint64_datatype(),
642                true,
643            ),
644        ];
645
646        Arc::new(Schema::new(columns))
647    }
648
649    /// Registers table under `greptime`, with `message`, `timestamp`, `host`, and `is_active` columns.
650    async fn build_test_table_provider(
651        table_name_tuples: &[(String, String)],
652    ) -> DfTableSourceProvider {
653        build_test_table_provider_with_schema(table_name_tuples, mock_schema()).await
654    }
655
656    /// Registers table under `greptime`, with typed columns for type inference tests.
657    async fn build_test_table_provider_with_typed_columns(
658        table_name_tuples: &[(String, String)],
659    ) -> DfTableSourceProvider {
660        build_test_table_provider_with_schema(table_name_tuples, mock_schema_with_typed_columns())
661            .await
662    }
663
664    async fn build_test_table_provider_with_schema(
665        table_name_tuples: &[(String, String)],
666        schema: SchemaRef,
667    ) -> DfTableSourceProvider {
668        let catalog_list = MemoryCatalogManager::with_default_setup();
669        for (schema_name, table_name) in table_name_tuples {
670            let table_meta = TableMetaBuilder::empty()
671                .schema(schema.clone())
672                .primary_key_indices(vec![2])
673                .value_indices(vec![0])
674                .next_column_id(1024)
675                .build()
676                .unwrap();
677            let table_info = TableInfoBuilder::default()
678                .name(table_name.to_string())
679                .meta(table_meta)
680                .build()
681                .unwrap();
682            let table = EmptyTable::from_table_info(&table_info);
683
684            catalog_list
685                .register_table_sync(RegisterTableRequest {
686                    catalog: DEFAULT_CATALOG_NAME.to_string(),
687                    schema: schema_name.to_string(),
688                    table_name: table_name.to_string(),
689                    table_id: 1024,
690                    table,
691                })
692                .unwrap();
693        }
694
695        DfTableSourceProvider::new(
696            catalog_list,
697            false,
698            QueryContext::arc(),
699            DummyDecoder::arc(),
700            false,
701        )
702    }
703
704    #[tokio::test]
705    async fn test_query_to_plan() {
706        let table_provider =
707            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
708        let session_state = SessionStateBuilder::new().with_default_features().build();
709        let mut planner = LogQueryPlanner::new(table_provider, session_state);
710
711        let log_query = LogQuery {
712            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
713            time_filter: TimeFilter {
714                start: Some("2021-01-01T00:00:00Z".to_string()),
715                end: Some("2021-01-02T00:00:00Z".to_string()),
716                span: None,
717            },
718            filters: Filters::Single(ColumnFilters {
719                expr: Box::new(LogExpr::NamedIdent("message".to_string())),
720                filters: vec![ContentFilter::Contains("error".to_string())],
721            }),
722            limit: Limit {
723                skip: None,
724                fetch: Some(100),
725            },
726            context: Context::None,
727            columns: vec![],
728            exprs: vec![],
729        };
730
731        let plan = planner.query_to_plan(log_query).await.unwrap();
732        let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
733\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
734\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
735
736        assert_eq!(plan.display_indent_schema().to_string(), expected);
737    }
738
739    #[tokio::test]
740    async fn test_build_time_filter() {
741        let table_provider =
742            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
743        let session_state = SessionStateBuilder::new().with_default_features().build();
744        let planner = LogQueryPlanner::new(table_provider, session_state);
745
746        let time_filter = TimeFilter {
747            start: Some("2021-01-01T00:00:00Z".to_string()),
748            end: Some("2021-01-02T00:00:00Z".to_string()),
749            span: None,
750        };
751
752        let expr = planner
753            .build_time_filter(&time_filter, &mock_schema())
754            .unwrap();
755
756        let expected_expr = col("timestamp")
757            .gt_eq(lit(ScalarValue::Utf8(Some(
758                "2021-01-01T00:00:00Z".to_string(),
759            ))))
760            .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
761                "2021-01-02T00:00:00Z".to_string(),
762            )))));
763
764        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
765    }
766
767    #[tokio::test]
768    async fn test_build_time_filter_without_end() {
769        let table_provider =
770            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
771        let session_state = SessionStateBuilder::new().with_default_features().build();
772        let planner = LogQueryPlanner::new(table_provider, session_state);
773
774        let time_filter = TimeFilter {
775            start: Some("2021-01-01T00:00:00Z".to_string()),
776            end: None,
777            span: None,
778        };
779
780        let expr = planner
781            .build_time_filter(&time_filter, &mock_schema())
782            .unwrap();
783
784        let expected_expr = col("timestamp")
785            .gt_eq(lit(ScalarValue::Utf8(Some(
786                "2021-01-01T00:00:00Z".to_string(),
787            ))))
788            .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
789                "9999-12-31T23:59:59Z".to_string(),
790            )))));
791
792        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
793    }
794
795    #[tokio::test]
796    async fn test_build_content_filter() {
797        let table_provider =
798            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
799        let session_state = SessionStateBuilder::new().with_default_features().build();
800        let planner = LogQueryPlanner::new(table_provider, session_state);
801        let schema = mock_schema();
802
803        let column_filter = ColumnFilters {
804            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
805            filters: vec![
806                ContentFilter::Contains("error".to_string()),
807                ContentFilter::Prefix("WARN".to_string()),
808            ],
809        };
810
811        let expr_option = planner
812            .build_column_filter(&column_filter, schema.arrow_schema())
813            .unwrap();
814        assert!(expr_option.is_some());
815
816        let expr = expr_option.unwrap();
817
818        let expected_expr = col("message")
819            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
820            .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
821
822        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
823    }
824
825    #[tokio::test]
826    async fn test_query_to_plan_with_only_skip() {
827        let table_provider =
828            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
829        let session_state = SessionStateBuilder::new().with_default_features().build();
830        let mut planner = LogQueryPlanner::new(table_provider, session_state);
831
832        let log_query = LogQuery {
833            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
834            time_filter: TimeFilter {
835                start: Some("2021-01-01T00:00:00Z".to_string()),
836                end: Some("2021-01-02T00:00:00Z".to_string()),
837                span: None,
838            },
839            filters: Filters::Single(ColumnFilters {
840                expr: Box::new(LogExpr::NamedIdent("message".to_string())),
841                filters: vec![ContentFilter::Contains("error".to_string())],
842            }),
843            limit: Limit {
844                skip: Some(10),
845                fetch: None,
846            },
847            context: Context::None,
848            columns: vec![],
849            exprs: vec![],
850        };
851
852        let plan = planner.query_to_plan(log_query).await.unwrap();
853        let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
854\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
855\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
856
857        assert_eq!(plan.display_indent_schema().to_string(), expected);
858    }
859
860    #[tokio::test]
861    async fn test_query_to_plan_without_limit() {
862        let table_provider =
863            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
864        let session_state = SessionStateBuilder::new().with_default_features().build();
865        let mut planner = LogQueryPlanner::new(table_provider, session_state);
866
867        let log_query = LogQuery {
868            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
869            time_filter: TimeFilter {
870                start: Some("2021-01-01T00:00:00Z".to_string()),
871                end: Some("2021-01-02T00:00:00Z".to_string()),
872                span: None,
873            },
874            filters: Filters::Single(ColumnFilters {
875                expr: Box::new(LogExpr::NamedIdent("message".to_string())),
876                filters: vec![ContentFilter::Contains("error".to_string())],
877            }),
878            limit: Limit {
879                skip: None,
880                fetch: None,
881            },
882            context: Context::None,
883            columns: vec![],
884            exprs: vec![],
885        };
886
887        let plan = planner.query_to_plan(log_query).await.unwrap();
888        let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
889\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
890\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
891
892        assert_eq!(plan.display_indent_schema().to_string(), expected);
893    }
894
895    #[test]
896    fn test_escape_pattern() {
897        assert_eq!(escape_like_pattern("test"), "test");
898        assert_eq!(escape_like_pattern("te%st"), "te\\%st");
899        assert_eq!(escape_like_pattern("te_st"), "te\\_st");
900        assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
901    }
902
903    #[tokio::test]
904    async fn test_query_to_plan_with_aggr_func() {
905        let table_provider =
906            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
907        let session_state = SessionStateBuilder::new().with_default_features().build();
908        let mut planner = LogQueryPlanner::new(table_provider, session_state);
909
910        let log_query = LogQuery {
911            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
912            time_filter: TimeFilter {
913                start: Some("2021-01-01T00:00:00Z".to_string()),
914                end: Some("2021-01-02T00:00:00Z".to_string()),
915                span: None,
916            },
917            filters: Default::default(),
918            limit: Limit {
919                skip: None,
920                fetch: Some(100),
921            },
922            context: Context::None,
923            columns: vec![],
924            exprs: vec![LogExpr::AggrFunc {
925                expr: vec![AggFunc::new(
926                    "count".to_string(),
927                    vec![LogExpr::NamedIdent("message".to_string())],
928                    Some("count_result".to_string()),
929                )],
930                by: vec![LogExpr::NamedIdent("host".to_string())],
931            }],
932        };
933
934        let plan = planner.query_to_plan(log_query).await.unwrap();
935        let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
936\n  Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
937\n    Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
938\n      TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
939
940        assert_eq!(plan.display_indent_schema().to_string(), expected);
941    }
942
943    #[tokio::test]
944    async fn test_query_to_plan_with_scalar_func() {
945        let table_provider =
946            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
947        let session_state = SessionStateBuilder::new().with_default_features().build();
948        let mut planner = LogQueryPlanner::new(table_provider, session_state);
949
950        let log_query = LogQuery {
951            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
952            time_filter: TimeFilter {
953                start: Some("2021-01-01T00:00:00Z".to_string()),
954                end: Some("2021-01-02T00:00:00Z".to_string()),
955                span: None,
956            },
957            filters: Default::default(),
958            limit: Limit {
959                skip: None,
960                fetch: Some(100),
961            },
962            context: Context::None,
963            columns: vec![],
964            exprs: vec![LogExpr::ScalarFunc {
965                name: "date_trunc".to_string(),
966                args: vec![
967                    LogExpr::NamedIdent("timestamp".to_string()),
968                    LogExpr::Literal("day".to_string()),
969                ],
970                alias: Some("time_bucket".to_string()),
971            }],
972        };
973
974        let plan = planner.query_to_plan(log_query).await.unwrap();
975        let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
976        \n  Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
977        \n    Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
978        \n      TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
979
980        assert_eq!(plan.display_indent_schema().to_string(), expected);
981    }
982
983    #[tokio::test]
984    async fn test_build_content_filter_between() {
985        let table_provider =
986            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
987        let session_state = SessionStateBuilder::new().with_default_features().build();
988        let planner = LogQueryPlanner::new(table_provider, session_state);
989        let schema = mock_schema();
990
991        let column_filter = ColumnFilters {
992            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
993            filters: vec![ContentFilter::Between {
994                start: "a".to_string(),
995                end: "z".to_string(),
996                start_inclusive: true,
997                end_inclusive: false,
998            }],
999        };
1000
1001        let expr_option = planner
1002            .build_column_filter(&column_filter, schema.arrow_schema())
1003            .unwrap();
1004        assert!(expr_option.is_some());
1005
1006        let expr = expr_option.unwrap();
1007        let expected_expr = col("message")
1008            .gt_eq(lit(ScalarValue::Utf8(Some("a".to_string()))))
1009            .and(col("message").lt(lit(ScalarValue::Utf8(Some("z".to_string())))));
1010
1011        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1012    }
1013
1014    #[tokio::test]
1015    async fn test_query_to_plan_with_date_histogram() {
1016        let table_provider =
1017            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1018        let session_state = SessionStateBuilder::new().with_default_features().build();
1019        let mut planner = LogQueryPlanner::new(table_provider, session_state);
1020
1021        let log_query = LogQuery {
1022            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
1023            time_filter: TimeFilter {
1024                start: Some("2021-01-01T00:00:00Z".to_string()),
1025                end: Some("2021-01-02T00:00:00Z".to_string()),
1026                span: None,
1027            },
1028            filters: Default::default(),
1029            limit: Limit {
1030                skip: Some(0),
1031                fetch: None,
1032            },
1033            context: Context::None,
1034            columns: vec![],
1035            exprs: vec![
1036                LogExpr::ScalarFunc {
1037                    name: "date_bin".to_string(),
1038                    args: vec![
1039                        LogExpr::Literal("30 seconds".to_string()),
1040                        LogExpr::NamedIdent("timestamp".to_string()),
1041                    ],
1042                    alias: Some("2__date_histogram__time_bucket".to_string()),
1043                },
1044                LogExpr::AggrFunc {
1045                    expr: vec![AggFunc::new(
1046                        "count".to_string(),
1047                        vec![LogExpr::PositionalIdent(0)],
1048                        Some("count_result".to_string()),
1049                    )],
1050                    by: vec![LogExpr::NamedIdent(
1051                        "2__date_histogram__time_bucket".to_string(),
1052                    )],
1053                },
1054            ],
1055        };
1056
1057        let plan = planner.query_to_plan(log_query).await.unwrap();
1058        let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
1059\n  Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
1060\n    Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1061\n      Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1062\n        TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
1063
1064        assert_eq!(plan.display_indent_schema().to_string(), expected);
1065    }
1066
1067    #[tokio::test]
1068    async fn test_build_compound_filter() {
1069        let table_provider =
1070            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1071        let session_state = SessionStateBuilder::new().with_default_features().build();
1072        let planner = LogQueryPlanner::new(table_provider, session_state);
1073        let schema = mock_schema();
1074
1075        // Test AND compound
1076        let column_filter = ColumnFilters {
1077            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1078            filters: vec![
1079                ContentFilter::Contains("error".to_string()),
1080                ContentFilter::Prefix("WARN".to_string()),
1081            ],
1082        };
1083        let expr = planner
1084            .build_column_filter(&column_filter, schema.arrow_schema())
1085            .unwrap()
1086            .unwrap();
1087
1088        let expected_expr = col("message")
1089            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1090            .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1091
1092        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1093
1094        // Test OR compound - use Compound filter for OR logic
1095        let column_filter = ColumnFilters {
1096            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1097            filters: vec![ContentFilter::Compound(
1098                vec![
1099                    ContentFilter::Contains("error".to_string()),
1100                    ContentFilter::Prefix("WARN".to_string()),
1101                ],
1102                ConjunctionOperator::Or,
1103            )],
1104        };
1105        let expr = planner
1106            .build_column_filter(&column_filter, schema.arrow_schema())
1107            .unwrap()
1108            .unwrap();
1109
1110        let expected_expr = col("message")
1111            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1112            .or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1113
1114        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1115
1116        // Test nested compound
1117        let column_filter = ColumnFilters {
1118            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1119            filters: vec![ContentFilter::Compound(
1120                vec![
1121                    ContentFilter::Contains("error".to_string()),
1122                    ContentFilter::Compound(
1123                        vec![
1124                            ContentFilter::Prefix("WARN".to_string()),
1125                            ContentFilter::Exact("DEBUG".to_string()),
1126                        ],
1127                        ConjunctionOperator::Or,
1128                    ),
1129                ],
1130                ConjunctionOperator::And,
1131            )],
1132        };
1133        let expr = planner
1134            .build_column_filter(&column_filter, schema.arrow_schema())
1135            .unwrap()
1136            .unwrap();
1137
1138        let expected_nested = col("message")
1139            .like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
1140            .or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string())))));
1141        let expected_expr = col("message")
1142            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1143            .and(expected_nested);
1144
1145        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1146    }
1147
1148    #[tokio::test]
1149    async fn test_build_great_than_filter() {
1150        let table_provider =
1151            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1152        let session_state = SessionStateBuilder::new().with_default_features().build();
1153        let planner = LogQueryPlanner::new(table_provider, session_state);
1154        let schema = mock_schema();
1155
1156        // Test GreatThan with inclusive=true
1157        let column_filter = ColumnFilters {
1158            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1159            filters: vec![ContentFilter::GreatThan {
1160                value: "error".to_string(),
1161                inclusive: true,
1162            }],
1163        };
1164
1165        let expr_option = planner
1166            .build_column_filter(&column_filter, schema.arrow_schema())
1167            .unwrap();
1168        assert!(expr_option.is_some());
1169
1170        let expr = expr_option.unwrap();
1171        let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1172
1173        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1174
1175        // Test GreatThan with inclusive=false
1176        let column_filter = ColumnFilters {
1177            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1178            filters: vec![ContentFilter::GreatThan {
1179                value: "error".to_string(),
1180                inclusive: false,
1181            }],
1182        };
1183
1184        let expr_option = planner
1185            .build_column_filter(&column_filter, schema.arrow_schema())
1186            .unwrap();
1187        assert!(expr_option.is_some());
1188
1189        let expr = expr_option.unwrap();
1190        let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1191
1192        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1193    }
1194
1195    #[tokio::test]
1196    async fn test_build_less_than_filter() {
1197        let table_provider =
1198            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1199        let session_state = SessionStateBuilder::new().with_default_features().build();
1200        let planner = LogQueryPlanner::new(table_provider, session_state);
1201        let schema = mock_schema();
1202
1203        // Test LessThan with inclusive=true
1204        let column_filter = ColumnFilters {
1205            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1206            filters: vec![ContentFilter::LessThan {
1207                value: "error".to_string(),
1208                inclusive: true,
1209            }],
1210        };
1211
1212        let expr_option = planner
1213            .build_column_filter(&column_filter, schema.arrow_schema())
1214            .unwrap();
1215        assert!(expr_option.is_some());
1216
1217        let expr = expr_option.unwrap();
1218        let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1219
1220        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1221
1222        // Test LessThan with inclusive=false
1223        let column_filter = ColumnFilters {
1224            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1225            filters: vec![ContentFilter::LessThan {
1226                value: "error".to_string(),
1227                inclusive: false,
1228            }],
1229        };
1230
1231        let expr_option = planner
1232            .build_column_filter(&column_filter, schema.arrow_schema())
1233            .unwrap();
1234        assert!(expr_option.is_some());
1235
1236        let expr = expr_option.unwrap();
1237        let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1238
1239        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1240    }
1241
1242    #[tokio::test]
1243    async fn test_build_in_filter() {
1244        let table_provider =
1245            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1246        let session_state = SessionStateBuilder::new().with_default_features().build();
1247        let planner = LogQueryPlanner::new(table_provider, session_state);
1248        let schema = mock_schema();
1249
1250        // Test In filter with multiple values
1251        let column_filter = ColumnFilters {
1252            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1253            filters: vec![ContentFilter::In(vec![
1254                "error".to_string(),
1255                "warning".to_string(),
1256                "info".to_string(),
1257            ])],
1258        };
1259
1260        let expr_option = planner
1261            .build_column_filter(&column_filter, schema.arrow_schema())
1262            .unwrap();
1263        assert!(expr_option.is_some());
1264
1265        let expr = expr_option.unwrap();
1266        let expected_expr = col("message").in_list(
1267            vec![
1268                lit(ScalarValue::Utf8(Some("error".to_string()))),
1269                lit(ScalarValue::Utf8(Some("warning".to_string()))),
1270                lit(ScalarValue::Utf8(Some("info".to_string()))),
1271            ],
1272            false,
1273        );
1274
1275        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1276    }
1277
1278    #[tokio::test]
1279    async fn test_build_is_true_filter() {
1280        let table_provider =
1281            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1282        let session_state = SessionStateBuilder::new().with_default_features().build();
1283        let planner = LogQueryPlanner::new(table_provider, session_state);
1284        let schema = mock_schema();
1285
1286        // Test IsTrue filter
1287        let column_filter = ColumnFilters {
1288            expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1289            filters: vec![ContentFilter::IsTrue],
1290        };
1291
1292        let expr_option = planner
1293            .build_column_filter(&column_filter, schema.arrow_schema())
1294            .unwrap();
1295        assert!(expr_option.is_some());
1296
1297        let expr = expr_option.unwrap();
1298        let expected_expr_string =
1299            "IsTrue(Column(Column { relation: None, name: \"is_active\" }))".to_string();
1300
1301        assert_eq!(format!("{:?}", expr), expected_expr_string);
1302    }
1303
1304    #[tokio::test]
1305    async fn test_build_filter_with_scalar_fn() {
1306        let table_provider =
1307            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1308        let session_state = SessionStateBuilder::new().with_default_features().build();
1309        let planner = LogQueryPlanner::new(table_provider, session_state);
1310        let schema = mock_schema();
1311
1312        let column_filter = ColumnFilters {
1313            expr: Box::new(LogExpr::BinaryOp {
1314                left: Box::new(LogExpr::ScalarFunc {
1315                    name: "character_length".to_string(),
1316                    args: vec![LogExpr::NamedIdent("message".to_string())],
1317                    alias: None,
1318                }),
1319                op: BinaryOperator::Gt,
1320                right: Box::new(LogExpr::Literal("100".to_string())),
1321            }),
1322            filters: vec![ContentFilter::IsTrue],
1323        };
1324
1325        let expr_option = planner
1326            .build_column_filter(&column_filter, schema.arrow_schema())
1327            .unwrap();
1328        assert!(expr_option.is_some());
1329
1330        let expr = expr_option.unwrap();
1331        let expected_expr_string = "character_length(message) > Int32(100) IS TRUE";
1332
1333        assert_eq!(format!("{}", expr), expected_expr_string);
1334    }
1335
1336    #[tokio::test]
1337    async fn test_type_inference_float_comparison() {
1338        let table_provider = build_test_table_provider_with_typed_columns(&[(
1339            "public".to_string(),
1340            "test_table".to_string(),
1341        )])
1342        .await;
1343        let session_state = SessionStateBuilder::new().with_default_features().build();
1344        let planner = LogQueryPlanner::new(table_provider, session_state);
1345        let schema = mock_schema_with_typed_columns();
1346
1347        // Test Between with float column and string literals
1348        let column_filter = ColumnFilters {
1349            expr: Box::new(LogExpr::NamedIdent("score".to_string())),
1350            filters: vec![ContentFilter::Between {
1351                start: "75.5".to_string(),
1352                end: "100.0".to_string(),
1353                start_inclusive: true,
1354                end_inclusive: false,
1355            }],
1356        };
1357
1358        let expr_option = planner
1359            .build_column_filter(&column_filter, schema.arrow_schema())
1360            .unwrap();
1361        assert!(expr_option.is_some());
1362
1363        let expr = expr_option.unwrap();
1364        // Should infer literals as Float64 since score is a float64 column
1365        let expected_expr = col("score")
1366            .gt_eq(lit(ScalarValue::Float64(Some(75.5))))
1367            .and(col("score").lt(lit(ScalarValue::Float64(Some(100.0)))));
1368
1369        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1370    }
1371
1372    #[tokio::test]
1373    async fn test_type_inference_boolean_comparison() {
1374        let table_provider = build_test_table_provider_with_typed_columns(&[(
1375            "public".to_string(),
1376            "test_table".to_string(),
1377        )])
1378        .await;
1379        let session_state = SessionStateBuilder::new().with_default_features().build();
1380        let planner = LogQueryPlanner::new(table_provider, session_state);
1381        let schema = mock_schema_with_typed_columns();
1382
1383        // Test In filter with boolean column and string literals
1384        let column_filter = ColumnFilters {
1385            expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1386            filters: vec![ContentFilter::In(vec![
1387                "true".to_string(),
1388                "1".to_string(),
1389                "false".to_string(),
1390            ])],
1391        };
1392
1393        let expr_option = planner
1394            .build_column_filter(&column_filter, schema.arrow_schema())
1395            .unwrap();
1396        assert!(expr_option.is_some());
1397
1398        let expr = expr_option.unwrap();
1399        // Should infer string literals as boolean values
1400        let expected_expr = col("is_active").in_list(
1401            vec![
1402                lit(ScalarValue::Boolean(Some(true))),
1403                lit(ScalarValue::Boolean(Some(true))),
1404                lit(ScalarValue::Boolean(Some(false))),
1405            ],
1406            false,
1407        );
1408
1409        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1410    }
1411
1412    #[tokio::test]
1413    async fn test_fallback_to_utf8_on_parse_failure() {
1414        let table_provider = build_test_table_provider_with_typed_columns(&[(
1415            "public".to_string(),
1416            "test_table".to_string(),
1417        )])
1418        .await;
1419        let session_state = SessionStateBuilder::new().with_default_features().build();
1420        let planner = LogQueryPlanner::new(table_provider, session_state);
1421        let schema = mock_schema_with_typed_columns();
1422
1423        // Test with invalid number format - should fallback to UTF8
1424        let column_filter = ColumnFilters {
1425            expr: Box::new(LogExpr::NamedIdent("age".to_string())),
1426            filters: vec![ContentFilter::GreatThan {
1427                value: "not_a_number".to_string(),
1428                inclusive: false,
1429            }],
1430        };
1431
1432        let expr_option = planner
1433            .build_column_filter(&column_filter, schema.arrow_schema())
1434            .unwrap();
1435        assert!(expr_option.is_some());
1436
1437        let expr = expr_option.unwrap();
1438        // Should fallback to UTF8 since "not_a_number" can't be parsed as int32
1439        let expected_expr = col("age").gt(lit(ScalarValue::Utf8(Some("not_a_number".to_string()))));
1440
1441        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1442    }
1443
1444    #[tokio::test]
1445    async fn test_string_column_remains_utf8() {
1446        let table_provider = build_test_table_provider_with_typed_columns(&[(
1447            "public".to_string(),
1448            "test_table".to_string(),
1449        )])
1450        .await;
1451        let session_state = SessionStateBuilder::new().with_default_features().build();
1452        let planner = LogQueryPlanner::new(table_provider, session_state);
1453        let schema = mock_schema_with_typed_columns();
1454
1455        // Test with string column - should remain UTF8 even if value looks like a number
1456        let column_filter = ColumnFilters {
1457            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1458            filters: vec![ContentFilter::GreatThan {
1459                value: "123".to_string(),
1460                inclusive: false,
1461            }],
1462        };
1463
1464        let expr_option = planner
1465            .build_column_filter(&column_filter, schema.arrow_schema())
1466            .unwrap();
1467        assert!(expr_option.is_some());
1468
1469        let expr = expr_option.unwrap();
1470        // Should remain UTF8 since message is a string column
1471        let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("123".to_string()))));
1472
1473        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1474    }
1475
1476    #[tokio::test]
1477    async fn test_all_binary_operators() {
1478        let table_provider = build_test_table_provider_with_typed_columns(&[(
1479            "public".to_string(),
1480            "test_table".to_string(),
1481        )])
1482        .await;
1483        let session_state = SessionStateBuilder::new().with_default_features().build();
1484        let planner = LogQueryPlanner::new(table_provider, session_state);
1485        let schema = mock_schema_with_typed_columns();
1486
1487        let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1488
1489        // Test all comparison operators
1490        let test_cases = vec![
1491            (BinaryOperator::Eq, Operator::Eq),
1492            (BinaryOperator::Ne, Operator::NotEq),
1493            (BinaryOperator::Lt, Operator::Lt),
1494            (BinaryOperator::Le, Operator::LtEq),
1495            (BinaryOperator::Gt, Operator::Gt),
1496            (BinaryOperator::Ge, Operator::GtEq),
1497            (BinaryOperator::Plus, Operator::Plus),
1498            (BinaryOperator::Minus, Operator::Minus),
1499            (BinaryOperator::Multiply, Operator::Multiply),
1500            (BinaryOperator::Divide, Operator::Divide),
1501            (BinaryOperator::Modulo, Operator::Modulo),
1502            (BinaryOperator::And, Operator::And),
1503            (BinaryOperator::Or, Operator::Or),
1504        ];
1505
1506        for (binary_op, expected_df_op) in test_cases {
1507            let binary_expr = LogExpr::BinaryOp {
1508                left: Box::new(LogExpr::NamedIdent("age".to_string())),
1509                op: binary_op,
1510                right: Box::new(LogExpr::Literal("25".to_string())),
1511            };
1512
1513            let expr = planner
1514                .log_expr_to_df_expr(&binary_expr, &df_schema)
1515                .unwrap();
1516
1517            let expected_expr = Expr::BinaryExpr(BinaryExpr {
1518                left: Box::new(col("age")),
1519                op: expected_df_op,
1520                right: Box::new(lit(ScalarValue::Int32(Some(25)))),
1521            });
1522
1523            assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1524        }
1525    }
1526
1527    #[tokio::test]
1528    async fn test_nested_binary_operations() {
1529        let table_provider = build_test_table_provider_with_typed_columns(&[(
1530            "public".to_string(),
1531            "test_table".to_string(),
1532        )])
1533        .await;
1534        let session_state = SessionStateBuilder::new().with_default_features().build();
1535        let planner = LogQueryPlanner::new(table_provider, session_state);
1536        let schema = mock_schema_with_typed_columns();
1537
1538        let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1539
1540        // Test nested binary operations: (age + 5) > 30
1541        let nested_binary_expr = LogExpr::BinaryOp {
1542            left: Box::new(LogExpr::BinaryOp {
1543                left: Box::new(LogExpr::NamedIdent("age".to_string())),
1544                op: BinaryOperator::Plus,
1545                right: Box::new(LogExpr::Literal("5".to_string())),
1546            }),
1547            op: BinaryOperator::Gt,
1548            right: Box::new(LogExpr::Literal("30".to_string())),
1549        };
1550
1551        let expr = planner
1552            .log_expr_to_df_expr(&nested_binary_expr, &df_schema)
1553            .unwrap();
1554
1555        // Verify the nested structure is properly created
1556        let expected_expr_debug = r#"BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "age" }), op: Plus, right: Literal(Int32(5), None) }), op: Gt, right: Literal(Int32(30), None) })"#;
1557        assert_eq!(format!("{:?}", expr), expected_expr_debug);
1558    }
1559}