query/log_query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow_schema::{DataType, Schema as ArrowSchema};
16use catalog::table_source::DfTableSourceProvider;
17use common_function::utils::escape_like_pattern;
18use datafusion::datasource::DefaultTableSource;
19use datafusion::execution::SessionState;
20use datafusion_common::{DFSchema, ScalarValue};
21use datafusion_expr::utils::{conjunction, disjunction};
22use datafusion_expr::{
23    col, lit, not, BinaryExpr, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, Operator,
24};
25use datafusion_sql::TableReference;
26use datatypes::schema::Schema;
27use log_query::{BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
28use snafu::{OptionExt, ResultExt};
29use table::table::adapter::DfTableProviderAdapter;
30
31use crate::log_query::error::{
32    CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
33    UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
34    UnknownTableSnafu,
35};
36
37const DEFAULT_LIMIT: usize = 1000;
38
39pub struct LogQueryPlanner {
40    table_provider: DfTableSourceProvider,
41    session_state: SessionState,
42}
43
44impl LogQueryPlanner {
45    pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
46        Self {
47            table_provider,
48            session_state,
49        }
50    }
51
52    pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
53        // Resolve table
54        let table_ref: TableReference = query.table.table_ref().into();
55        let table_source = self
56            .table_provider
57            .resolve_table(table_ref.clone())
58            .await
59            .context(CatalogSnafu)?;
60        let schema = table_source
61            .as_any()
62            .downcast_ref::<DefaultTableSource>()
63            .context(UnknownTableSnafu)?
64            .table_provider
65            .as_any()
66            .downcast_ref::<DfTableProviderAdapter>()
67            .context(UnknownTableSnafu)?
68            .table()
69            .schema();
70
71        // Build the initial scan plan
72        let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
73            .context(DataFusionPlanningSnafu)?;
74        let df_schema = plan_builder.schema().clone();
75
76        // Collect filter expressions
77        let mut filters = Vec::new();
78
79        // Time filter
80        filters.push(self.build_time_filter(&query.time_filter, &schema)?);
81
82        if let Some(filters_expr) = self.build_filters(&query.filters, df_schema.as_arrow())? {
83            filters.push(filters_expr);
84        }
85
86        // Apply filters
87        if !filters.is_empty() {
88            let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap();
89            plan_builder = plan_builder
90                .filter(filter_expr)
91                .context(DataFusionPlanningSnafu)?;
92        }
93
94        // Apply projections
95        if !query.columns.is_empty() {
96            let projected_columns = query.columns.iter().map(col).collect::<Vec<_>>();
97            plan_builder = plan_builder
98                .project(projected_columns)
99                .context(DataFusionPlanningSnafu)?;
100        }
101
102        // Apply limit
103        plan_builder = plan_builder
104            .limit(
105                query.limit.skip.unwrap_or(0),
106                Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
107            )
108            .context(DataFusionPlanningSnafu)?;
109
110        // Apply log expressions
111        for expr in &query.exprs {
112            plan_builder = self.process_log_expr(plan_builder, expr)?;
113        }
114
115        // Build the final plan
116        let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
117
118        Ok(plan)
119    }
120
121    fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result<Expr> {
122        let timestamp_col = schema
123            .timestamp_column()
124            .with_context(|| TimeIndexNotFoundSnafu {})?
125            .name
126            .clone();
127
128        let start_time = ScalarValue::Utf8(time_filter.start.clone());
129        let end_time = ScalarValue::Utf8(
130            time_filter
131                .end
132                .clone()
133                .or(Some("9999-12-31T23:59:59Z".to_string())),
134        );
135        let expr = col(timestamp_col.clone())
136            .gt_eq(lit(start_time))
137            .and(col(timestamp_col).lt_eq(lit(end_time)));
138
139        Ok(expr)
140    }
141    //disjunction
142    fn build_filters(
143        &self,
144        filters: &log_query::Filters,
145        schema: &ArrowSchema,
146    ) -> Result<Option<Expr>> {
147        match filters {
148            log_query::Filters::And(filters) => {
149                let exprs = filters
150                    .iter()
151                    .filter_map(|filter| self.build_filters(filter, schema).transpose())
152                    .try_collect::<Vec<_>>()?;
153                if exprs.is_empty() {
154                    Ok(None)
155                } else {
156                    Ok(conjunction(exprs))
157                }
158            }
159            log_query::Filters::Or(filters) => {
160                let exprs = filters
161                    .iter()
162                    .filter_map(|filter| self.build_filters(filter, schema).transpose())
163                    .try_collect::<Vec<_>>()?;
164                if exprs.is_empty() {
165                    Ok(None)
166                } else {
167                    Ok(disjunction(exprs))
168                }
169            }
170            log_query::Filters::Not(filter) => {
171                if let Some(expr) = self.build_filters(filter, schema)? {
172                    Ok(Some(not(expr)))
173                } else {
174                    Ok(None)
175                }
176            }
177            log_query::Filters::Single(column_filters) => {
178                // Build a single column filter
179                self.build_column_filter(column_filters, schema)
180            }
181        }
182    }
183
184    /// Builds filter expression from ColumnFilters (new structure with expr + filters)
185    fn build_column_filter(
186        &self,
187        column_filter: &log_query::ColumnFilters,
188        schema: &ArrowSchema,
189    ) -> Result<Option<Expr>> {
190        // Convert ArrowSchema to DFSchema for the more generic function
191        let df_schema = DFSchema::try_from(schema.clone()).context(DataFusionPlanningSnafu)?;
192        let col_expr = self.log_expr_to_df_expr(&column_filter.expr, &df_schema)?;
193
194        let filter_exprs = column_filter
195            .filters
196            .iter()
197            .filter_map(|filter| {
198                self.build_content_filter_with_expr(col_expr.clone(), filter, &df_schema)
199                    .transpose()
200            })
201            .try_collect::<Vec<_>>()?;
202
203        if filter_exprs.is_empty() {
204            return Ok(Some(col_expr.is_true()));
205        }
206
207        // Combine all filters with AND logic
208        Ok(conjunction(filter_exprs))
209    }
210
211    /// Builds filter expression from a single ContentFilter using a provided column expression
212    #[allow(clippy::only_used_in_recursion)]
213    fn build_content_filter_with_expr(
214        &self,
215        col_expr: Expr,
216        filter: &log_query::ContentFilter,
217        schema: &DFSchema,
218    ) -> Result<Option<Expr>> {
219        match filter {
220            log_query::ContentFilter::Exact(value) => Ok(Some(
221                col_expr.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))),
222            )),
223            log_query::ContentFilter::Prefix(value) => Ok(Some(col_expr.like(lit(
224                ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(value)))),
225            )))),
226            log_query::ContentFilter::Postfix(value) => Ok(Some(col_expr.like(lit(
227                ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(value)))),
228            )))),
229            log_query::ContentFilter::Contains(value) => Ok(Some(col_expr.like(lit(
230                ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(value)))),
231            )))),
232            log_query::ContentFilter::Regex(_pattern) => Err(UnimplementedSnafu {
233                feature: "regex filter",
234            }
235            .build()),
236            log_query::ContentFilter::Exist => Ok(Some(col_expr.is_not_null())),
237            log_query::ContentFilter::Between {
238                start,
239                end,
240                start_inclusive,
241                end_inclusive,
242            } => {
243                let start_literal = self.create_inferred_literal(start, &col_expr, schema);
244                let end_literal = self.create_inferred_literal(end, &col_expr, schema);
245
246                let left = if *start_inclusive {
247                    col_expr.clone().gt_eq(start_literal)
248                } else {
249                    col_expr.clone().gt(start_literal)
250                };
251                let right = if *end_inclusive {
252                    col_expr.lt_eq(end_literal)
253                } else {
254                    col_expr.lt(end_literal)
255                };
256                Ok(Some(left.and(right)))
257            }
258            log_query::ContentFilter::GreatThan { value, inclusive } => {
259                let value_literal = self.create_inferred_literal(value, &col_expr, schema);
260                let comparison_expr = if *inclusive {
261                    col_expr.gt_eq(value_literal)
262                } else {
263                    col_expr.gt(value_literal)
264                };
265                Ok(Some(comparison_expr))
266            }
267            log_query::ContentFilter::LessThan { value, inclusive } => {
268                let value_literal = self.create_inferred_literal(value, &col_expr, schema);
269                if *inclusive {
270                    Ok(Some(col_expr.lt_eq(value_literal)))
271                } else {
272                    Ok(Some(col_expr.lt(value_literal)))
273                }
274            }
275            log_query::ContentFilter::In(values) => {
276                let inferred_values: Vec<_> = values
277                    .iter()
278                    .map(|v| self.create_inferred_literal(v, &col_expr, schema))
279                    .collect();
280                Ok(Some(col_expr.in_list(inferred_values, false)))
281            }
282            log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())),
283            log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())),
284            log_query::ContentFilter::Equal(value) => {
285                let value_literal = Self::create_eq_literal(value.clone());
286                Ok(Some(col_expr.eq(value_literal)))
287            }
288            log_query::ContentFilter::Compound(filters, op) => {
289                let exprs = filters
290                    .iter()
291                    .filter_map(|filter| {
292                        self.build_content_filter_with_expr(col_expr.clone(), filter, schema)
293                            .transpose()
294                    })
295                    .try_collect::<Vec<_>>()?;
296
297                if exprs.is_empty() {
298                    return Ok(None);
299                }
300
301                match op {
302                    log_query::ConjunctionOperator::And => Ok(conjunction(exprs)),
303                    log_query::ConjunctionOperator::Or => {
304                        // Build a disjunction (OR) of expressions
305                        Ok(exprs.into_iter().reduce(|a, b| a.or(b)))
306                    }
307                }
308            }
309        }
310    }
311
312    fn build_aggr_func(
313        &self,
314        schema: &DFSchema,
315        fn_name: &str,
316        args: &[LogExpr],
317        by: &[LogExpr],
318    ) -> Result<(Expr, Vec<Expr>)> {
319        let aggr_fn = self
320            .session_state
321            .aggregate_functions()
322            .get(fn_name)
323            .context(UnknownAggregateFunctionSnafu {
324                name: fn_name.to_string(),
325            })?;
326        let args = args
327            .iter()
328            .map(|expr| self.log_expr_to_df_expr(expr, schema))
329            .try_collect::<Vec<_>>()?;
330        let group_exprs = by
331            .iter()
332            .map(|expr| self.log_expr_to_df_expr(expr, schema))
333            .try_collect::<Vec<_>>()?;
334        let aggr_expr = aggr_fn.call(args);
335
336        Ok((aggr_expr, group_exprs))
337    }
338
339    /// Converts a LogExpr to a DataFusion Expr, handling all expression types.
340    fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
341        match expr {
342            LogExpr::NamedIdent(name) => Ok(col(name)),
343            LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
344            LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
345            LogExpr::BinaryOp { left, op, right } => {
346                // For binary operations, always use type inference (matches original behavior)
347                self.build_binary_expr(left, op, right, schema)
348            }
349            LogExpr::ScalarFunc { name, args, alias } => {
350                self.build_scalar_func(schema, name, args, alias)
351            }
352            LogExpr::Alias { expr, alias } => {
353                let df_expr = self.log_expr_to_df_expr(expr, schema)?;
354                Ok(df_expr.alias(alias))
355            }
356            LogExpr::AggrFunc { .. } | LogExpr::Filter { .. } | LogExpr::Decompose { .. } => {
357                UnexpectedLogExprSnafu {
358                    expr: expr.clone(),
359                    expected: "not a typical expression",
360                }
361                .fail()
362            }
363        }
364    }
365
366    fn build_scalar_func(
367        &self,
368        schema: &DFSchema,
369        name: &str,
370        args: &[LogExpr],
371        alias: &Option<String>,
372    ) -> Result<Expr> {
373        let args = args
374            .iter()
375            .map(|expr| self.log_expr_to_df_expr(expr, schema))
376            .try_collect::<Vec<_>>()?;
377        let func = self.session_state.scalar_functions().get(name).context(
378            UnknownScalarFunctionSnafu {
379                name: name.to_string(),
380            },
381        )?;
382        let expr = func.call(args);
383
384        if let Some(alias) = alias {
385            Ok(expr.alias(alias))
386        } else {
387            Ok(expr)
388        }
389    }
390
391    /// Convert BinaryOperator to DataFusion's Operator.
392    fn binary_operator_to_df_operator(op: &BinaryOperator) -> Operator {
393        match op {
394            BinaryOperator::Eq => Operator::Eq,
395            BinaryOperator::Ne => Operator::NotEq,
396            BinaryOperator::Lt => Operator::Lt,
397            BinaryOperator::Le => Operator::LtEq,
398            BinaryOperator::Gt => Operator::Gt,
399            BinaryOperator::Ge => Operator::GtEq,
400            BinaryOperator::Plus => Operator::Plus,
401            BinaryOperator::Minus => Operator::Minus,
402            BinaryOperator::Multiply => Operator::Multiply,
403            BinaryOperator::Divide => Operator::Divide,
404            BinaryOperator::Modulo => Operator::Modulo,
405            BinaryOperator::And => Operator::And,
406            BinaryOperator::Or => Operator::Or,
407        }
408    }
409
410    /// Parse a string literal to the appropriate ScalarValue based on target DataType.
411    /// Falls back to UTF8 if parsing fails or type is not supported.
412    fn infer_literal_scalar_value(&self, literal: &str, target_type: &DataType) -> ScalarValue {
413        let utf8_literal = ScalarValue::Utf8(Some(literal.to_string()));
414        utf8_literal.cast_to(target_type).unwrap_or(utf8_literal)
415    }
416
417    /// Build binary expression with type inference for literals.
418    /// Attempts to infer literal types from the non-literal operand's type.
419    fn build_binary_expr(
420        &self,
421        left: &LogExpr,
422        op: &BinaryOperator,
423        right: &LogExpr,
424        schema: &DFSchema,
425    ) -> Result<Expr> {
426        // Convert both sides to DataFusion expressions first
427        let mut left_expr = self.log_expr_to_df_expr(left, schema)?;
428        let mut right_expr = self.log_expr_to_df_expr(right, schema)?;
429
430        // Try to infer literal types based on the other operand
431        match (left, right) {
432            (LogExpr::Literal(_), LogExpr::Literal(_)) => {
433                // both are literal, do nothing
434            }
435            (LogExpr::Literal(literal), _) => {
436                // Left is literal, try to infer from right
437                if let Ok(right_type) = right_expr.get_type(schema) {
438                    let inferred_scalar = self.infer_literal_scalar_value(literal, &right_type);
439                    left_expr = lit(inferred_scalar);
440                }
441            }
442            (_, LogExpr::Literal(literal)) => {
443                // Right is literal, try to infer from left
444                if let Ok(left_type) = left_expr.get_type(schema) {
445                    let inferred_scalar = self.infer_literal_scalar_value(literal, &left_type);
446                    right_expr = lit(inferred_scalar);
447                }
448            }
449            _ => {
450                // Neither is a simple literal, no type inference needed
451            }
452        }
453
454        let df_op = Self::binary_operator_to_df_operator(op);
455        Ok(Expr::BinaryExpr(BinaryExpr {
456            left: Box::new(left_expr),
457            op: df_op,
458            right: Box::new(right_expr),
459        }))
460    }
461
462    /// Create a type-inferred literal based on the provided expression's type.
463    /// Falls back to UTF8 if type inference fails.
464    fn create_inferred_literal(&self, value: &str, expr: &Expr, schema: &DFSchema) -> Expr {
465        if let Ok(expr_type) = expr.get_type(schema) {
466            lit(self.infer_literal_scalar_value(value, &expr_type))
467        } else {
468            lit(ScalarValue::Utf8(Some(value.to_string())))
469        }
470    }
471
472    fn create_eq_literal(value: EqualValue) -> Expr {
473        match value {
474            EqualValue::String(s) => lit(ScalarValue::Utf8(Some(s))),
475            EqualValue::Float(n) => lit(ScalarValue::Float64(Some(n))),
476            EqualValue::Int(n) => lit(ScalarValue::Int64(Some(n))),
477            EqualValue::Boolean(b) => lit(ScalarValue::Boolean(Some(b))),
478            EqualValue::UInt(n) => lit(ScalarValue::UInt64(Some(n))),
479        }
480    }
481
482    /// Process LogExpr recursively.
483    ///
484    /// Return the [`LogicalPlanBuilder`] after modification and the resulting expression's names.
485    fn process_log_expr(
486        &self,
487        plan_builder: LogicalPlanBuilder,
488        expr: &LogExpr,
489    ) -> Result<LogicalPlanBuilder> {
490        let mut plan_builder = plan_builder;
491
492        match expr {
493            LogExpr::AggrFunc {
494                name,
495                args,
496                by,
497                range: _range,
498                alias,
499            } => {
500                let schema = plan_builder.schema();
501                let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?;
502                if let Some(alias) = alias {
503                    aggr_expr = aggr_expr.alias(alias);
504                }
505
506                plan_builder = plan_builder
507                    .aggregate(group_exprs, [aggr_expr.clone()])
508                    .context(DataFusionPlanningSnafu)?;
509            }
510            LogExpr::Filter { filter } => {
511                let schema = plan_builder.schema();
512                if let Some(filter_expr) = self.build_column_filter(filter, schema.as_arrow())? {
513                    plan_builder = plan_builder
514                        .filter(filter_expr)
515                        .context(DataFusionPlanningSnafu)?;
516                }
517            }
518            LogExpr::ScalarFunc { name, args, alias } => {
519                let schema = plan_builder.schema();
520                let expr = self.build_scalar_func(schema, name, args, alias)?;
521                plan_builder = plan_builder
522                    .project([expr])
523                    .context(DataFusionPlanningSnafu)?;
524            }
525            LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
526                // nothing to do, return empty vec.
527            }
528            LogExpr::Alias { expr, alias } => {
529                let schema = plan_builder.schema();
530                let df_expr = self.log_expr_to_df_expr(expr, schema)?;
531                let aliased_expr = df_expr.alias(alias);
532                plan_builder = plan_builder
533                    .project([aliased_expr.clone()])
534                    .context(DataFusionPlanningSnafu)?;
535            }
536            LogExpr::BinaryOp { .. } => {
537                let schema = plan_builder.schema();
538                let binary_expr = self.log_expr_to_df_expr(expr, schema)?;
539
540                plan_builder = plan_builder
541                    .project([binary_expr])
542                    .context(DataFusionPlanningSnafu)?;
543            }
544            _ => {
545                UnimplementedSnafu {
546                    feature: "log expression",
547                }
548                .fail()?;
549            }
550        }
551        Ok(plan_builder)
552    }
553}
554
555#[cfg(test)]
556mod tests {
557    use std::sync::Arc;
558
559    use catalog::memory::MemoryCatalogManager;
560    use catalog::RegisterTableRequest;
561    use common_catalog::consts::DEFAULT_CATALOG_NAME;
562    use common_query::test_util::DummyDecoder;
563    use datafusion::execution::SessionStateBuilder;
564    use datatypes::prelude::ConcreteDataType;
565    use datatypes::schema::{ColumnSchema, SchemaRef};
566    use log_query::{
567        ColumnFilters, ConjunctionOperator, ContentFilter, Context, Filters, Limit, LogExpr,
568    };
569    use session::context::QueryContext;
570    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
571    use table::table_name::TableName;
572    use table::test_util::EmptyTable;
573
574    use super::*;
575
576    fn mock_schema() -> SchemaRef {
577        let columns = vec![
578            ColumnSchema::new(
579                "message".to_string(),
580                ConcreteDataType::string_datatype(),
581                false,
582            ),
583            ColumnSchema::new(
584                "timestamp".to_string(),
585                ConcreteDataType::timestamp_millisecond_datatype(),
586                false,
587            )
588            .with_time_index(true),
589            ColumnSchema::new(
590                "host".to_string(),
591                ConcreteDataType::string_datatype(),
592                true,
593            ),
594            ColumnSchema::new(
595                "is_active".to_string(),
596                ConcreteDataType::boolean_datatype(),
597                true,
598            ),
599        ];
600
601        Arc::new(Schema::new(columns))
602    }
603
604    fn mock_schema_with_typed_columns() -> SchemaRef {
605        let columns = vec![
606            ColumnSchema::new(
607                "message".to_string(),
608                ConcreteDataType::string_datatype(),
609                false,
610            ),
611            ColumnSchema::new(
612                "timestamp".to_string(),
613                ConcreteDataType::timestamp_millisecond_datatype(),
614                false,
615            )
616            .with_time_index(true),
617            ColumnSchema::new(
618                "host".to_string(),
619                ConcreteDataType::string_datatype(),
620                true,
621            ),
622            ColumnSchema::new(
623                "is_active".to_string(),
624                ConcreteDataType::boolean_datatype(),
625                true,
626            ),
627            // Add more typed columns for comprehensive testing
628            ColumnSchema::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
629            ColumnSchema::new(
630                "score".to_string(),
631                ConcreteDataType::float64_datatype(),
632                true,
633            ),
634            ColumnSchema::new(
635                "count".to_string(),
636                ConcreteDataType::uint64_datatype(),
637                true,
638            ),
639        ];
640
641        Arc::new(Schema::new(columns))
642    }
643
644    /// Registers table under `greptime`, with `message`, `timestamp`, `host`, and `is_active` columns.
645    async fn build_test_table_provider(
646        table_name_tuples: &[(String, String)],
647    ) -> DfTableSourceProvider {
648        build_test_table_provider_with_schema(table_name_tuples, mock_schema()).await
649    }
650
651    /// Registers table under `greptime`, with typed columns for type inference tests.
652    async fn build_test_table_provider_with_typed_columns(
653        table_name_tuples: &[(String, String)],
654    ) -> DfTableSourceProvider {
655        build_test_table_provider_with_schema(table_name_tuples, mock_schema_with_typed_columns())
656            .await
657    }
658
659    async fn build_test_table_provider_with_schema(
660        table_name_tuples: &[(String, String)],
661        schema: SchemaRef,
662    ) -> DfTableSourceProvider {
663        let catalog_list = MemoryCatalogManager::with_default_setup();
664        for (schema_name, table_name) in table_name_tuples {
665            let table_meta = TableMetaBuilder::empty()
666                .schema(schema.clone())
667                .primary_key_indices(vec![2])
668                .value_indices(vec![0])
669                .next_column_id(1024)
670                .build()
671                .unwrap();
672            let table_info = TableInfoBuilder::default()
673                .name(table_name.to_string())
674                .meta(table_meta)
675                .build()
676                .unwrap();
677            let table = EmptyTable::from_table_info(&table_info);
678
679            catalog_list
680                .register_table_sync(RegisterTableRequest {
681                    catalog: DEFAULT_CATALOG_NAME.to_string(),
682                    schema: schema_name.to_string(),
683                    table_name: table_name.to_string(),
684                    table_id: 1024,
685                    table,
686                })
687                .unwrap();
688        }
689
690        DfTableSourceProvider::new(
691            catalog_list,
692            false,
693            QueryContext::arc(),
694            DummyDecoder::arc(),
695            false,
696        )
697    }
698
699    #[tokio::test]
700    async fn test_query_to_plan() {
701        let table_provider =
702            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
703        let session_state = SessionStateBuilder::new().with_default_features().build();
704        let mut planner = LogQueryPlanner::new(table_provider, session_state);
705
706        let log_query = LogQuery {
707            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
708            time_filter: TimeFilter {
709                start: Some("2021-01-01T00:00:00Z".to_string()),
710                end: Some("2021-01-02T00:00:00Z".to_string()),
711                span: None,
712            },
713            filters: Filters::Single(ColumnFilters {
714                expr: Box::new(LogExpr::NamedIdent("message".to_string())),
715                filters: vec![ContentFilter::Contains("error".to_string())],
716            }),
717            limit: Limit {
718                skip: None,
719                fetch: Some(100),
720            },
721            context: Context::None,
722            columns: vec![],
723            exprs: vec![],
724        };
725
726        let plan = planner.query_to_plan(log_query).await.unwrap();
727        let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
728\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
729\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
730
731        assert_eq!(plan.display_indent_schema().to_string(), expected);
732    }
733
734    #[tokio::test]
735    async fn test_build_time_filter() {
736        let table_provider =
737            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
738        let session_state = SessionStateBuilder::new().with_default_features().build();
739        let planner = LogQueryPlanner::new(table_provider, session_state);
740
741        let time_filter = TimeFilter {
742            start: Some("2021-01-01T00:00:00Z".to_string()),
743            end: Some("2021-01-02T00:00:00Z".to_string()),
744            span: None,
745        };
746
747        let expr = planner
748            .build_time_filter(&time_filter, &mock_schema())
749            .unwrap();
750
751        let expected_expr = col("timestamp")
752            .gt_eq(lit(ScalarValue::Utf8(Some(
753                "2021-01-01T00:00:00Z".to_string(),
754            ))))
755            .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
756                "2021-01-02T00:00:00Z".to_string(),
757            )))));
758
759        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
760    }
761
762    #[tokio::test]
763    async fn test_build_time_filter_without_end() {
764        let table_provider =
765            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
766        let session_state = SessionStateBuilder::new().with_default_features().build();
767        let planner = LogQueryPlanner::new(table_provider, session_state);
768
769        let time_filter = TimeFilter {
770            start: Some("2021-01-01T00:00:00Z".to_string()),
771            end: None,
772            span: None,
773        };
774
775        let expr = planner
776            .build_time_filter(&time_filter, &mock_schema())
777            .unwrap();
778
779        let expected_expr = col("timestamp")
780            .gt_eq(lit(ScalarValue::Utf8(Some(
781                "2021-01-01T00:00:00Z".to_string(),
782            ))))
783            .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
784                "9999-12-31T23:59:59Z".to_string(),
785            )))));
786
787        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
788    }
789
790    #[tokio::test]
791    async fn test_build_content_filter() {
792        let table_provider =
793            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
794        let session_state = SessionStateBuilder::new().with_default_features().build();
795        let planner = LogQueryPlanner::new(table_provider, session_state);
796        let schema = mock_schema();
797
798        let column_filter = ColumnFilters {
799            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
800            filters: vec![
801                ContentFilter::Contains("error".to_string()),
802                ContentFilter::Prefix("WARN".to_string()),
803            ],
804        };
805
806        let expr_option = planner
807            .build_column_filter(&column_filter, schema.arrow_schema())
808            .unwrap();
809        assert!(expr_option.is_some());
810
811        let expr = expr_option.unwrap();
812
813        let expected_expr = col("message")
814            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
815            .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
816
817        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
818    }
819
820    #[tokio::test]
821    async fn test_query_to_plan_with_only_skip() {
822        let table_provider =
823            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
824        let session_state = SessionStateBuilder::new().with_default_features().build();
825        let mut planner = LogQueryPlanner::new(table_provider, session_state);
826
827        let log_query = LogQuery {
828            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
829            time_filter: TimeFilter {
830                start: Some("2021-01-01T00:00:00Z".to_string()),
831                end: Some("2021-01-02T00:00:00Z".to_string()),
832                span: None,
833            },
834            filters: Filters::Single(ColumnFilters {
835                expr: Box::new(LogExpr::NamedIdent("message".to_string())),
836                filters: vec![ContentFilter::Contains("error".to_string())],
837            }),
838            limit: Limit {
839                skip: Some(10),
840                fetch: None,
841            },
842            context: Context::None,
843            columns: vec![],
844            exprs: vec![],
845        };
846
847        let plan = planner.query_to_plan(log_query).await.unwrap();
848        let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
849\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
850\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
851
852        assert_eq!(plan.display_indent_schema().to_string(), expected);
853    }
854
855    #[tokio::test]
856    async fn test_query_to_plan_without_limit() {
857        let table_provider =
858            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
859        let session_state = SessionStateBuilder::new().with_default_features().build();
860        let mut planner = LogQueryPlanner::new(table_provider, session_state);
861
862        let log_query = LogQuery {
863            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
864            time_filter: TimeFilter {
865                start: Some("2021-01-01T00:00:00Z".to_string()),
866                end: Some("2021-01-02T00:00:00Z".to_string()),
867                span: None,
868            },
869            filters: Filters::Single(ColumnFilters {
870                expr: Box::new(LogExpr::NamedIdent("message".to_string())),
871                filters: vec![ContentFilter::Contains("error".to_string())],
872            }),
873            limit: Limit {
874                skip: None,
875                fetch: None,
876            },
877            context: Context::None,
878            columns: vec![],
879            exprs: vec![],
880        };
881
882        let plan = planner.query_to_plan(log_query).await.unwrap();
883        let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
884\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
885\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
886
887        assert_eq!(plan.display_indent_schema().to_string(), expected);
888    }
889
890    #[test]
891    fn test_escape_pattern() {
892        assert_eq!(escape_like_pattern("test"), "test");
893        assert_eq!(escape_like_pattern("te%st"), "te\\%st");
894        assert_eq!(escape_like_pattern("te_st"), "te\\_st");
895        assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
896    }
897
898    #[tokio::test]
899    async fn test_query_to_plan_with_aggr_func() {
900        let table_provider =
901            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
902        let session_state = SessionStateBuilder::new().with_default_features().build();
903        let mut planner = LogQueryPlanner::new(table_provider, session_state);
904
905        let log_query = LogQuery {
906            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
907            time_filter: TimeFilter {
908                start: Some("2021-01-01T00:00:00Z".to_string()),
909                end: Some("2021-01-02T00:00:00Z".to_string()),
910                span: None,
911            },
912            filters: Default::default(),
913            limit: Limit {
914                skip: None,
915                fetch: Some(100),
916            },
917            context: Context::None,
918            columns: vec![],
919            exprs: vec![LogExpr::AggrFunc {
920                name: "count".to_string(),
921                args: vec![LogExpr::NamedIdent("message".to_string())],
922                by: vec![LogExpr::NamedIdent("host".to_string())],
923                range: None,
924                alias: Some("count_result".to_string()),
925            }],
926        };
927
928        let plan = planner.query_to_plan(log_query).await.unwrap();
929        let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
930\n  Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
931\n    Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
932\n      TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
933
934        assert_eq!(plan.display_indent_schema().to_string(), expected);
935    }
936
937    #[tokio::test]
938    async fn test_query_to_plan_with_scalar_func() {
939        let table_provider =
940            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
941        let session_state = SessionStateBuilder::new().with_default_features().build();
942        let mut planner = LogQueryPlanner::new(table_provider, session_state);
943
944        let log_query = LogQuery {
945            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
946            time_filter: TimeFilter {
947                start: Some("2021-01-01T00:00:00Z".to_string()),
948                end: Some("2021-01-02T00:00:00Z".to_string()),
949                span: None,
950            },
951            filters: Default::default(),
952            limit: Limit {
953                skip: None,
954                fetch: Some(100),
955            },
956            context: Context::None,
957            columns: vec![],
958            exprs: vec![LogExpr::ScalarFunc {
959                name: "date_trunc".to_string(),
960                args: vec![
961                    LogExpr::NamedIdent("timestamp".to_string()),
962                    LogExpr::Literal("day".to_string()),
963                ],
964                alias: Some("time_bucket".to_string()),
965            }],
966        };
967
968        let plan = planner.query_to_plan(log_query).await.unwrap();
969        let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
970        \n  Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
971        \n    Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
972        \n      TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
973
974        assert_eq!(plan.display_indent_schema().to_string(), expected);
975    }
976
977    #[tokio::test]
978    async fn test_build_content_filter_between() {
979        let table_provider =
980            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
981        let session_state = SessionStateBuilder::new().with_default_features().build();
982        let planner = LogQueryPlanner::new(table_provider, session_state);
983        let schema = mock_schema();
984
985        let column_filter = ColumnFilters {
986            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
987            filters: vec![ContentFilter::Between {
988                start: "a".to_string(),
989                end: "z".to_string(),
990                start_inclusive: true,
991                end_inclusive: false,
992            }],
993        };
994
995        let expr_option = planner
996            .build_column_filter(&column_filter, schema.arrow_schema())
997            .unwrap();
998        assert!(expr_option.is_some());
999
1000        let expr = expr_option.unwrap();
1001        let expected_expr = col("message")
1002            .gt_eq(lit(ScalarValue::Utf8(Some("a".to_string()))))
1003            .and(col("message").lt(lit(ScalarValue::Utf8(Some("z".to_string())))));
1004
1005        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1006    }
1007
1008    #[tokio::test]
1009    async fn test_query_to_plan_with_date_histogram() {
1010        let table_provider =
1011            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1012        let session_state = SessionStateBuilder::new().with_default_features().build();
1013        let mut planner = LogQueryPlanner::new(table_provider, session_state);
1014
1015        let log_query = LogQuery {
1016            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
1017            time_filter: TimeFilter {
1018                start: Some("2021-01-01T00:00:00Z".to_string()),
1019                end: Some("2021-01-02T00:00:00Z".to_string()),
1020                span: None,
1021            },
1022            filters: Default::default(),
1023            limit: Limit {
1024                skip: Some(0),
1025                fetch: None,
1026            },
1027            context: Context::None,
1028            columns: vec![],
1029            exprs: vec![
1030                LogExpr::ScalarFunc {
1031                    name: "date_bin".to_string(),
1032                    args: vec![
1033                        LogExpr::Literal("30 seconds".to_string()),
1034                        LogExpr::NamedIdent("timestamp".to_string()),
1035                    ],
1036                    alias: Some("2__date_histogram__time_bucket".to_string()),
1037                },
1038                LogExpr::AggrFunc {
1039                    name: "count".to_string(),
1040                    args: vec![LogExpr::PositionalIdent(0)],
1041                    by: vec![LogExpr::NamedIdent(
1042                        "2__date_histogram__time_bucket".to_string(),
1043                    )],
1044                    range: None,
1045                    alias: Some("count_result".to_string()),
1046                },
1047            ],
1048        };
1049
1050        let plan = planner.query_to_plan(log_query).await.unwrap();
1051        let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
1052\n  Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
1053\n    Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1054\n      Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
1055\n        TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
1056
1057        assert_eq!(plan.display_indent_schema().to_string(), expected);
1058    }
1059
1060    #[tokio::test]
1061    async fn test_build_compound_filter() {
1062        let table_provider =
1063            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1064        let session_state = SessionStateBuilder::new().with_default_features().build();
1065        let planner = LogQueryPlanner::new(table_provider, session_state);
1066        let schema = mock_schema();
1067
1068        // Test AND compound
1069        let column_filter = ColumnFilters {
1070            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1071            filters: vec![
1072                ContentFilter::Contains("error".to_string()),
1073                ContentFilter::Prefix("WARN".to_string()),
1074            ],
1075        };
1076        let expr = planner
1077            .build_column_filter(&column_filter, schema.arrow_schema())
1078            .unwrap()
1079            .unwrap();
1080
1081        let expected_expr = col("message")
1082            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1083            .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1084
1085        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1086
1087        // Test OR compound - use Compound filter for OR logic
1088        let column_filter = ColumnFilters {
1089            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1090            filters: vec![ContentFilter::Compound(
1091                vec![
1092                    ContentFilter::Contains("error".to_string()),
1093                    ContentFilter::Prefix("WARN".to_string()),
1094                ],
1095                ConjunctionOperator::Or,
1096            )],
1097        };
1098        let expr = planner
1099            .build_column_filter(&column_filter, schema.arrow_schema())
1100            .unwrap()
1101            .unwrap();
1102
1103        let expected_expr = col("message")
1104            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1105            .or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
1106
1107        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1108
1109        // Test nested compound
1110        let column_filter = ColumnFilters {
1111            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1112            filters: vec![ContentFilter::Compound(
1113                vec![
1114                    ContentFilter::Contains("error".to_string()),
1115                    ContentFilter::Compound(
1116                        vec![
1117                            ContentFilter::Prefix("WARN".to_string()),
1118                            ContentFilter::Exact("DEBUG".to_string()),
1119                        ],
1120                        ConjunctionOperator::Or,
1121                    ),
1122                ],
1123                ConjunctionOperator::And,
1124            )],
1125        };
1126        let expr = planner
1127            .build_column_filter(&column_filter, schema.arrow_schema())
1128            .unwrap()
1129            .unwrap();
1130
1131        let expected_nested = col("message")
1132            .like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
1133            .or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string())))));
1134        let expected_expr = col("message")
1135            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
1136            .and(expected_nested);
1137
1138        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1139    }
1140
1141    #[tokio::test]
1142    async fn test_build_great_than_filter() {
1143        let table_provider =
1144            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1145        let session_state = SessionStateBuilder::new().with_default_features().build();
1146        let planner = LogQueryPlanner::new(table_provider, session_state);
1147        let schema = mock_schema();
1148
1149        // Test GreatThan with inclusive=true
1150        let column_filter = ColumnFilters {
1151            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1152            filters: vec![ContentFilter::GreatThan {
1153                value: "error".to_string(),
1154                inclusive: true,
1155            }],
1156        };
1157
1158        let expr_option = planner
1159            .build_column_filter(&column_filter, schema.arrow_schema())
1160            .unwrap();
1161        assert!(expr_option.is_some());
1162
1163        let expr = expr_option.unwrap();
1164        let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1165
1166        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1167
1168        // Test GreatThan with inclusive=false
1169        let column_filter = ColumnFilters {
1170            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1171            filters: vec![ContentFilter::GreatThan {
1172                value: "error".to_string(),
1173                inclusive: false,
1174            }],
1175        };
1176
1177        let expr_option = planner
1178            .build_column_filter(&column_filter, schema.arrow_schema())
1179            .unwrap();
1180        assert!(expr_option.is_some());
1181
1182        let expr = expr_option.unwrap();
1183        let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1184
1185        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1186    }
1187
1188    #[tokio::test]
1189    async fn test_build_less_than_filter() {
1190        let table_provider =
1191            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1192        let session_state = SessionStateBuilder::new().with_default_features().build();
1193        let planner = LogQueryPlanner::new(table_provider, session_state);
1194        let schema = mock_schema();
1195
1196        // Test LessThan with inclusive=true
1197        let column_filter = ColumnFilters {
1198            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1199            filters: vec![ContentFilter::LessThan {
1200                value: "error".to_string(),
1201                inclusive: true,
1202            }],
1203        };
1204
1205        let expr_option = planner
1206            .build_column_filter(&column_filter, schema.arrow_schema())
1207            .unwrap();
1208        assert!(expr_option.is_some());
1209
1210        let expr = expr_option.unwrap();
1211        let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
1212
1213        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1214
1215        // Test LessThan with inclusive=false
1216        let column_filter = ColumnFilters {
1217            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1218            filters: vec![ContentFilter::LessThan {
1219                value: "error".to_string(),
1220                inclusive: false,
1221            }],
1222        };
1223
1224        let expr_option = planner
1225            .build_column_filter(&column_filter, schema.arrow_schema())
1226            .unwrap();
1227        assert!(expr_option.is_some());
1228
1229        let expr = expr_option.unwrap();
1230        let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string()))));
1231
1232        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1233    }
1234
1235    #[tokio::test]
1236    async fn test_build_in_filter() {
1237        let table_provider =
1238            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1239        let session_state = SessionStateBuilder::new().with_default_features().build();
1240        let planner = LogQueryPlanner::new(table_provider, session_state);
1241        let schema = mock_schema();
1242
1243        // Test In filter with multiple values
1244        let column_filter = ColumnFilters {
1245            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1246            filters: vec![ContentFilter::In(vec![
1247                "error".to_string(),
1248                "warning".to_string(),
1249                "info".to_string(),
1250            ])],
1251        };
1252
1253        let expr_option = planner
1254            .build_column_filter(&column_filter, schema.arrow_schema())
1255            .unwrap();
1256        assert!(expr_option.is_some());
1257
1258        let expr = expr_option.unwrap();
1259        let expected_expr = col("message").in_list(
1260            vec![
1261                lit(ScalarValue::Utf8(Some("error".to_string()))),
1262                lit(ScalarValue::Utf8(Some("warning".to_string()))),
1263                lit(ScalarValue::Utf8(Some("info".to_string()))),
1264            ],
1265            false,
1266        );
1267
1268        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1269    }
1270
1271    #[tokio::test]
1272    async fn test_build_is_true_filter() {
1273        let table_provider =
1274            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1275        let session_state = SessionStateBuilder::new().with_default_features().build();
1276        let planner = LogQueryPlanner::new(table_provider, session_state);
1277        let schema = mock_schema();
1278
1279        // Test IsTrue filter
1280        let column_filter = ColumnFilters {
1281            expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1282            filters: vec![ContentFilter::IsTrue],
1283        };
1284
1285        let expr_option = planner
1286            .build_column_filter(&column_filter, schema.arrow_schema())
1287            .unwrap();
1288        assert!(expr_option.is_some());
1289
1290        let expr = expr_option.unwrap();
1291        let expected_expr_string =
1292            "IsTrue(Column(Column { relation: None, name: \"is_active\" }))".to_string();
1293
1294        assert_eq!(format!("{:?}", expr), expected_expr_string);
1295    }
1296
1297    #[tokio::test]
1298    async fn test_build_filter_with_scalar_fn() {
1299        let table_provider =
1300            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
1301        let session_state = SessionStateBuilder::new().with_default_features().build();
1302        let planner = LogQueryPlanner::new(table_provider, session_state);
1303        let schema = mock_schema();
1304
1305        let column_filter = ColumnFilters {
1306            expr: Box::new(LogExpr::BinaryOp {
1307                left: Box::new(LogExpr::ScalarFunc {
1308                    name: "character_length".to_string(),
1309                    args: vec![LogExpr::NamedIdent("message".to_string())],
1310                    alias: None,
1311                }),
1312                op: BinaryOperator::Gt,
1313                right: Box::new(LogExpr::Literal("100".to_string())),
1314            }),
1315            filters: vec![ContentFilter::IsTrue],
1316        };
1317
1318        let expr_option = planner
1319            .build_column_filter(&column_filter, schema.arrow_schema())
1320            .unwrap();
1321        assert!(expr_option.is_some());
1322
1323        let expr = expr_option.unwrap();
1324        let expected_expr_string = "character_length(message) > Int32(100) IS TRUE";
1325
1326        assert_eq!(format!("{}", expr), expected_expr_string);
1327    }
1328
1329    #[tokio::test]
1330    async fn test_type_inference_float_comparison() {
1331        let table_provider = build_test_table_provider_with_typed_columns(&[(
1332            "public".to_string(),
1333            "test_table".to_string(),
1334        )])
1335        .await;
1336        let session_state = SessionStateBuilder::new().with_default_features().build();
1337        let planner = LogQueryPlanner::new(table_provider, session_state);
1338        let schema = mock_schema_with_typed_columns();
1339
1340        // Test Between with float column and string literals
1341        let column_filter = ColumnFilters {
1342            expr: Box::new(LogExpr::NamedIdent("score".to_string())),
1343            filters: vec![ContentFilter::Between {
1344                start: "75.5".to_string(),
1345                end: "100.0".to_string(),
1346                start_inclusive: true,
1347                end_inclusive: false,
1348            }],
1349        };
1350
1351        let expr_option = planner
1352            .build_column_filter(&column_filter, schema.arrow_schema())
1353            .unwrap();
1354        assert!(expr_option.is_some());
1355
1356        let expr = expr_option.unwrap();
1357        // Should infer literals as Float64 since score is a float64 column
1358        let expected_expr = col("score")
1359            .gt_eq(lit(ScalarValue::Float64(Some(75.5))))
1360            .and(col("score").lt(lit(ScalarValue::Float64(Some(100.0)))));
1361
1362        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1363    }
1364
1365    #[tokio::test]
1366    async fn test_type_inference_boolean_comparison() {
1367        let table_provider = build_test_table_provider_with_typed_columns(&[(
1368            "public".to_string(),
1369            "test_table".to_string(),
1370        )])
1371        .await;
1372        let session_state = SessionStateBuilder::new().with_default_features().build();
1373        let planner = LogQueryPlanner::new(table_provider, session_state);
1374        let schema = mock_schema_with_typed_columns();
1375
1376        // Test In filter with boolean column and string literals
1377        let column_filter = ColumnFilters {
1378            expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
1379            filters: vec![ContentFilter::In(vec![
1380                "true".to_string(),
1381                "1".to_string(),
1382                "false".to_string(),
1383            ])],
1384        };
1385
1386        let expr_option = planner
1387            .build_column_filter(&column_filter, schema.arrow_schema())
1388            .unwrap();
1389        assert!(expr_option.is_some());
1390
1391        let expr = expr_option.unwrap();
1392        // Should infer string literals as boolean values
1393        let expected_expr = col("is_active").in_list(
1394            vec![
1395                lit(ScalarValue::Boolean(Some(true))),
1396                lit(ScalarValue::Boolean(Some(true))),
1397                lit(ScalarValue::Boolean(Some(false))),
1398            ],
1399            false,
1400        );
1401
1402        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1403    }
1404
1405    #[tokio::test]
1406    async fn test_fallback_to_utf8_on_parse_failure() {
1407        let table_provider = build_test_table_provider_with_typed_columns(&[(
1408            "public".to_string(),
1409            "test_table".to_string(),
1410        )])
1411        .await;
1412        let session_state = SessionStateBuilder::new().with_default_features().build();
1413        let planner = LogQueryPlanner::new(table_provider, session_state);
1414        let schema = mock_schema_with_typed_columns();
1415
1416        // Test with invalid number format - should fallback to UTF8
1417        let column_filter = ColumnFilters {
1418            expr: Box::new(LogExpr::NamedIdent("age".to_string())),
1419            filters: vec![ContentFilter::GreatThan {
1420                value: "not_a_number".to_string(),
1421                inclusive: false,
1422            }],
1423        };
1424
1425        let expr_option = planner
1426            .build_column_filter(&column_filter, schema.arrow_schema())
1427            .unwrap();
1428        assert!(expr_option.is_some());
1429
1430        let expr = expr_option.unwrap();
1431        // Should fallback to UTF8 since "not_a_number" can't be parsed as int32
1432        let expected_expr = col("age").gt(lit(ScalarValue::Utf8(Some("not_a_number".to_string()))));
1433
1434        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1435    }
1436
1437    #[tokio::test]
1438    async fn test_string_column_remains_utf8() {
1439        let table_provider = build_test_table_provider_with_typed_columns(&[(
1440            "public".to_string(),
1441            "test_table".to_string(),
1442        )])
1443        .await;
1444        let session_state = SessionStateBuilder::new().with_default_features().build();
1445        let planner = LogQueryPlanner::new(table_provider, session_state);
1446        let schema = mock_schema_with_typed_columns();
1447
1448        // Test with string column - should remain UTF8 even if value looks like a number
1449        let column_filter = ColumnFilters {
1450            expr: Box::new(LogExpr::NamedIdent("message".to_string())),
1451            filters: vec![ContentFilter::GreatThan {
1452                value: "123".to_string(),
1453                inclusive: false,
1454            }],
1455        };
1456
1457        let expr_option = planner
1458            .build_column_filter(&column_filter, schema.arrow_schema())
1459            .unwrap();
1460        assert!(expr_option.is_some());
1461
1462        let expr = expr_option.unwrap();
1463        // Should remain UTF8 since message is a string column
1464        let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("123".to_string()))));
1465
1466        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1467    }
1468
1469    #[tokio::test]
1470    async fn test_all_binary_operators() {
1471        let table_provider = build_test_table_provider_with_typed_columns(&[(
1472            "public".to_string(),
1473            "test_table".to_string(),
1474        )])
1475        .await;
1476        let session_state = SessionStateBuilder::new().with_default_features().build();
1477        let planner = LogQueryPlanner::new(table_provider, session_state);
1478        let schema = mock_schema_with_typed_columns();
1479
1480        let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1481
1482        // Test all comparison operators
1483        let test_cases = vec![
1484            (BinaryOperator::Eq, Operator::Eq),
1485            (BinaryOperator::Ne, Operator::NotEq),
1486            (BinaryOperator::Lt, Operator::Lt),
1487            (BinaryOperator::Le, Operator::LtEq),
1488            (BinaryOperator::Gt, Operator::Gt),
1489            (BinaryOperator::Ge, Operator::GtEq),
1490            (BinaryOperator::Plus, Operator::Plus),
1491            (BinaryOperator::Minus, Operator::Minus),
1492            (BinaryOperator::Multiply, Operator::Multiply),
1493            (BinaryOperator::Divide, Operator::Divide),
1494            (BinaryOperator::Modulo, Operator::Modulo),
1495            (BinaryOperator::And, Operator::And),
1496            (BinaryOperator::Or, Operator::Or),
1497        ];
1498
1499        for (binary_op, expected_df_op) in test_cases {
1500            let binary_expr = LogExpr::BinaryOp {
1501                left: Box::new(LogExpr::NamedIdent("age".to_string())),
1502                op: binary_op,
1503                right: Box::new(LogExpr::Literal("25".to_string())),
1504            };
1505
1506            let expr = planner
1507                .log_expr_to_df_expr(&binary_expr, &df_schema)
1508                .unwrap();
1509
1510            let expected_expr = Expr::BinaryExpr(BinaryExpr {
1511                left: Box::new(col("age")),
1512                op: expected_df_op,
1513                right: Box::new(lit(ScalarValue::Int32(Some(25)))),
1514            });
1515
1516            assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1517        }
1518    }
1519
1520    #[tokio::test]
1521    async fn test_nested_binary_operations() {
1522        let table_provider = build_test_table_provider_with_typed_columns(&[(
1523            "public".to_string(),
1524            "test_table".to_string(),
1525        )])
1526        .await;
1527        let session_state = SessionStateBuilder::new().with_default_features().build();
1528        let planner = LogQueryPlanner::new(table_provider, session_state);
1529        let schema = mock_schema_with_typed_columns();
1530
1531        let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap();
1532
1533        // Test nested binary operations: (age + 5) > 30
1534        let nested_binary_expr = LogExpr::BinaryOp {
1535            left: Box::new(LogExpr::BinaryOp {
1536                left: Box::new(LogExpr::NamedIdent("age".to_string())),
1537                op: BinaryOperator::Plus,
1538                right: Box::new(LogExpr::Literal("5".to_string())),
1539            }),
1540            op: BinaryOperator::Gt,
1541            right: Box::new(LogExpr::Literal("30".to_string())),
1542        };
1543
1544        let expr = planner
1545            .log_expr_to_df_expr(&nested_binary_expr, &df_schema)
1546            .unwrap();
1547
1548        // Verify the nested structure is properly created
1549        let expected_expr_debug = r#"BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: "age" }), op: Plus, right: Literal(Int32(5), None) }), op: Gt, right: Literal(Int32(30), None) })"#;
1550        assert_eq!(format!("{:?}", expr), expected_expr_debug);
1551    }
1552}