query/log_query/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use catalog::table_source::DfTableSourceProvider;
16use common_function::utils::escape_like_pattern;
17use datafusion::datasource::DefaultTableSource;
18use datafusion::execution::SessionState;
19use datafusion_common::{DFSchema, ScalarValue};
20use datafusion_expr::utils::conjunction;
21use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
22use datafusion_sql::TableReference;
23use datatypes::schema::Schema;
24use log_query::{ColumnFilters, LogExpr, LogQuery, TimeFilter};
25use snafu::{OptionExt, ResultExt};
26use table::table::adapter::DfTableProviderAdapter;
27
28use crate::log_query::error::{
29    CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
30    UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
31    UnknownTableSnafu,
32};
33
34const DEFAULT_LIMIT: usize = 1000;
35
36pub struct LogQueryPlanner {
37    table_provider: DfTableSourceProvider,
38    session_state: SessionState,
39}
40
41impl LogQueryPlanner {
42    pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
43        Self {
44            table_provider,
45            session_state,
46        }
47    }
48
49    pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
50        // Resolve table
51        let table_ref: TableReference = query.table.table_ref().into();
52        let table_source = self
53            .table_provider
54            .resolve_table(table_ref.clone())
55            .await
56            .context(CatalogSnafu)?;
57        let schema = table_source
58            .as_any()
59            .downcast_ref::<DefaultTableSource>()
60            .context(UnknownTableSnafu)?
61            .table_provider
62            .as_any()
63            .downcast_ref::<DfTableProviderAdapter>()
64            .context(UnknownTableSnafu)?
65            .table()
66            .schema();
67
68        // Build the initial scan plan
69        let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
70            .context(DataFusionPlanningSnafu)?;
71
72        // Collect filter expressions
73        let mut filters = Vec::new();
74
75        // Time filter
76        filters.push(self.build_time_filter(&query.time_filter, &schema)?);
77
78        // Column filters
79        for column_filter in &query.filters {
80            if let Some(expr) = self.build_column_filter(column_filter)? {
81                filters.push(expr);
82            }
83        }
84
85        // Apply filters
86        if !filters.is_empty() {
87            let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap();
88            plan_builder = plan_builder
89                .filter(filter_expr)
90                .context(DataFusionPlanningSnafu)?;
91        }
92
93        // Apply projections
94        if !query.columns.is_empty() {
95            let projected_columns = query.columns.iter().map(col).collect::<Vec<_>>();
96            plan_builder = plan_builder
97                .project(projected_columns)
98                .context(DataFusionPlanningSnafu)?;
99        }
100
101        // Apply limit
102        plan_builder = plan_builder
103            .limit(
104                query.limit.skip.unwrap_or(0),
105                Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
106            )
107            .context(DataFusionPlanningSnafu)?;
108
109        // Apply log expressions
110        for expr in &query.exprs {
111            plan_builder = self.process_log_expr(plan_builder, expr)?;
112        }
113
114        // Build the final plan
115        let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
116
117        Ok(plan)
118    }
119
120    fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result<Expr> {
121        let timestamp_col = schema
122            .timestamp_column()
123            .with_context(|| TimeIndexNotFoundSnafu {})?
124            .name
125            .clone();
126
127        let start_time = ScalarValue::Utf8(time_filter.start.clone());
128        let end_time = ScalarValue::Utf8(
129            time_filter
130                .end
131                .clone()
132                .or(Some("9999-12-31T23:59:59Z".to_string())),
133        );
134        let expr = col(timestamp_col.clone())
135            .gt_eq(lit(start_time))
136            .and(col(timestamp_col).lt_eq(lit(end_time)));
137
138        Ok(expr)
139    }
140
141    /// Returns filter expressions
142    fn build_column_filter(&self, column_filter: &ColumnFilters) -> Result<Option<Expr>> {
143        if column_filter.filters.is_empty() {
144            return Ok(None);
145        }
146
147        self.build_content_filters(&column_filter.column_name, &column_filter.filters)
148    }
149
150    /// Builds filter expressions from content filters for a specific column
151    fn build_content_filters(
152        &self,
153        column_name: &str,
154        filters: &[log_query::ContentFilter],
155    ) -> Result<Option<Expr>> {
156        if filters.is_empty() {
157            return Ok(None);
158        }
159
160        let exprs = filters
161            .iter()
162            .map(|filter| self.build_content_filter(column_name, filter))
163            .try_collect::<Vec<_>>()?;
164
165        Ok(conjunction(exprs))
166    }
167
168    /// Builds a single content filter expression
169    #[allow(clippy::only_used_in_recursion)]
170    fn build_content_filter(
171        &self,
172        column_name: &str,
173        filter: &log_query::ContentFilter,
174    ) -> Result<Expr> {
175        match filter {
176            log_query::ContentFilter::Exact(pattern) => {
177                Ok(col(column_name)
178                    .like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern))))))
179            }
180            log_query::ContentFilter::Prefix(pattern) => Ok(col(column_name).like(lit(
181                ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(pattern)))),
182            ))),
183            log_query::ContentFilter::Postfix(pattern) => Ok(col(column_name).like(lit(
184                ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(pattern)))),
185            ))),
186            log_query::ContentFilter::Contains(pattern) => Ok(col(column_name).like(lit(
187                ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(pattern)))),
188            ))),
189            log_query::ContentFilter::Regex(..) => Err::<Expr, _>(
190                UnimplementedSnafu {
191                    feature: "regex filter",
192                }
193                .build(),
194            ),
195            log_query::ContentFilter::Exist => Ok(col(column_name).is_not_null()),
196            log_query::ContentFilter::Between {
197                start,
198                end,
199                start_inclusive,
200                end_inclusive,
201            } => {
202                let left = if *start_inclusive {
203                    Expr::gt_eq
204                } else {
205                    Expr::gt
206                };
207                let right = if *end_inclusive {
208                    Expr::lt_eq
209                } else {
210                    Expr::lt
211                };
212                Ok(left(
213                    col(column_name),
214                    lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))),
215                )
216                .and(right(
217                    col(column_name),
218                    lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))),
219                )))
220            }
221            log_query::ContentFilter::GreatThan { value, inclusive } => {
222                let expr = if *inclusive { Expr::gt_eq } else { Expr::gt };
223                Ok(expr(
224                    col(column_name),
225                    lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
226                ))
227            }
228            log_query::ContentFilter::LessThan { value, inclusive } => {
229                let expr = if *inclusive { Expr::lt_eq } else { Expr::lt };
230                Ok(expr(
231                    col(column_name),
232                    lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
233                ))
234            }
235            log_query::ContentFilter::In(values) => {
236                let list = values
237                    .iter()
238                    .map(|value| lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))))
239                    .collect();
240                Ok(col(column_name).in_list(list, false))
241            }
242            log_query::ContentFilter::Compound(filters, op) => {
243                let exprs = filters
244                    .iter()
245                    .map(|filter| self.build_content_filter(column_name, filter))
246                    .try_collect::<Vec<_>>()?;
247
248                match op {
249                    log_query::BinaryOperator::And => Ok(conjunction(exprs).unwrap()),
250                    log_query::BinaryOperator::Or => {
251                        // Build a disjunction (OR) of expressions
252                        Ok(exprs.into_iter().reduce(|a, b| a.or(b)).unwrap())
253                    }
254                }
255            }
256        }
257    }
258
259    fn build_aggr_func(
260        &self,
261        schema: &DFSchema,
262        fn_name: &str,
263        args: &[LogExpr],
264        by: &[LogExpr],
265    ) -> Result<(Expr, Vec<Expr>)> {
266        let aggr_fn = self
267            .session_state
268            .aggregate_functions()
269            .get(fn_name)
270            .context(UnknownAggregateFunctionSnafu {
271                name: fn_name.to_string(),
272            })?;
273        let args = args
274            .iter()
275            .map(|expr| self.log_expr_to_column_expr(expr, schema))
276            .try_collect::<Vec<_>>()?;
277        let group_exprs = by
278            .iter()
279            .map(|expr| self.log_expr_to_column_expr(expr, schema))
280            .try_collect::<Vec<_>>()?;
281        let aggr_expr = aggr_fn.call(args);
282
283        Ok((aggr_expr, group_exprs))
284    }
285
286    /// Converts a log expression to a column expression.
287    ///
288    /// A column expression here can be a column identifier, a positional identifier, or a literal.
289    /// They don't rely on the context of the query or other columns.
290    fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
291        match expr {
292            LogExpr::NamedIdent(name) => Ok(col(name)),
293            LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
294            LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
295            _ => UnexpectedLogExprSnafu {
296                expr: expr.clone(),
297                expected: "named identifier, positional identifier, or literal",
298            }
299            .fail(),
300        }
301    }
302
303    fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result<Expr> {
304        let args = args
305            .iter()
306            .map(|expr| self.log_expr_to_column_expr(expr, schema))
307            .try_collect::<Vec<_>>()?;
308        let func = self.session_state.scalar_functions().get(name).context(
309            UnknownScalarFunctionSnafu {
310                name: name.to_string(),
311            },
312        )?;
313        let expr = func.call(args);
314
315        Ok(expr)
316    }
317
318    /// Process LogExpr recursively.
319    ///
320    /// Return the [`LogicalPlanBuilder`] after modification and the resulting expression's names.
321    fn process_log_expr(
322        &self,
323        plan_builder: LogicalPlanBuilder,
324        expr: &LogExpr,
325    ) -> Result<LogicalPlanBuilder> {
326        let mut plan_builder = plan_builder;
327
328        match expr {
329            LogExpr::AggrFunc {
330                name,
331                args,
332                by,
333                range: _range,
334                alias,
335            } => {
336                let schema = plan_builder.schema();
337                let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?;
338                if let Some(alias) = alias {
339                    aggr_expr = aggr_expr.alias(alias);
340                }
341
342                plan_builder = plan_builder
343                    .aggregate(group_exprs, [aggr_expr.clone()])
344                    .context(DataFusionPlanningSnafu)?;
345            }
346            LogExpr::Filter { expr, filter } => {
347                let schema = plan_builder.schema();
348                let expr = self.log_expr_to_column_expr(expr, schema)?;
349
350                let col_name = expr.schema_name().to_string();
351                let filter_expr = self.build_content_filter(&col_name, filter)?;
352                plan_builder = plan_builder
353                    .filter(filter_expr)
354                    .context(DataFusionPlanningSnafu)?;
355            }
356            LogExpr::ScalarFunc { name, args, alias } => {
357                let schema = plan_builder.schema();
358                let mut expr = self.build_scalar_func(schema, name, args)?;
359                if let Some(alias) = alias {
360                    expr = expr.alias(alias);
361                }
362                plan_builder = plan_builder
363                    .project([expr.clone()])
364                    .context(DataFusionPlanningSnafu)?;
365            }
366            LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
367                // nothing to do, return empty vec.
368            }
369            LogExpr::Alias { expr, alias } => {
370                let expr = self.log_expr_to_column_expr(expr, plan_builder.schema())?;
371                let aliased_expr = expr.alias(alias);
372                plan_builder = plan_builder
373                    .project([aliased_expr.clone()])
374                    .context(DataFusionPlanningSnafu)?;
375            }
376            _ => {
377                UnimplementedSnafu {
378                    feature: "log expression",
379                }
380                .fail()?;
381            }
382        }
383        Ok(plan_builder)
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use std::sync::Arc;
390
391    use catalog::memory::MemoryCatalogManager;
392    use catalog::RegisterTableRequest;
393    use common_catalog::consts::DEFAULT_CATALOG_NAME;
394    use common_query::test_util::DummyDecoder;
395    use datafusion::execution::SessionStateBuilder;
396    use datatypes::prelude::ConcreteDataType;
397    use datatypes::schema::{ColumnSchema, SchemaRef};
398    use log_query::{BinaryOperator, ContentFilter, Context, Limit};
399    use session::context::QueryContext;
400    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
401    use table::table_name::TableName;
402    use table::test_util::EmptyTable;
403
404    use super::*;
405
406    fn mock_schema() -> SchemaRef {
407        let columns = vec![
408            ColumnSchema::new(
409                "message".to_string(),
410                ConcreteDataType::string_datatype(),
411                false,
412            ),
413            ColumnSchema::new(
414                "timestamp".to_string(),
415                ConcreteDataType::timestamp_millisecond_datatype(),
416                false,
417            )
418            .with_time_index(true),
419            ColumnSchema::new(
420                "host".to_string(),
421                ConcreteDataType::string_datatype(),
422                true,
423            ),
424        ];
425
426        Arc::new(Schema::new(columns))
427    }
428
429    /// Registers table under `greptime`, with `message` and `timestamp` and `host` columns.
430    async fn build_test_table_provider(
431        table_name_tuples: &[(String, String)],
432    ) -> DfTableSourceProvider {
433        let catalog_list = MemoryCatalogManager::with_default_setup();
434        for (schema_name, table_name) in table_name_tuples {
435            let schema = mock_schema();
436            let table_meta = TableMetaBuilder::empty()
437                .schema(schema)
438                .primary_key_indices(vec![2])
439                .value_indices(vec![0])
440                .next_column_id(1024)
441                .build()
442                .unwrap();
443            let table_info = TableInfoBuilder::default()
444                .name(table_name.to_string())
445                .meta(table_meta)
446                .build()
447                .unwrap();
448            let table = EmptyTable::from_table_info(&table_info);
449
450            catalog_list
451                .register_table_sync(RegisterTableRequest {
452                    catalog: DEFAULT_CATALOG_NAME.to_string(),
453                    schema: schema_name.to_string(),
454                    table_name: table_name.to_string(),
455                    table_id: 1024,
456                    table,
457                })
458                .unwrap();
459        }
460
461        DfTableSourceProvider::new(
462            catalog_list,
463            false,
464            QueryContext::arc(),
465            DummyDecoder::arc(),
466            false,
467        )
468    }
469
470    #[tokio::test]
471    async fn test_query_to_plan() {
472        let table_provider =
473            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
474        let session_state = SessionStateBuilder::new().with_default_features().build();
475        let mut planner = LogQueryPlanner::new(table_provider, session_state);
476
477        let log_query = LogQuery {
478            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
479            time_filter: TimeFilter {
480                start: Some("2021-01-01T00:00:00Z".to_string()),
481                end: Some("2021-01-02T00:00:00Z".to_string()),
482                span: None,
483            },
484            filters: vec![ColumnFilters {
485                column_name: "message".to_string(),
486                filters: vec![ContentFilter::Contains("error".to_string())],
487            }],
488            limit: Limit {
489                skip: None,
490                fetch: Some(100),
491            },
492            context: Context::None,
493            columns: vec![],
494            exprs: vec![],
495        };
496
497        let plan = planner.query_to_plan(log_query).await.unwrap();
498        let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
499\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
500\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
501
502        assert_eq!(plan.display_indent_schema().to_string(), expected);
503    }
504
505    #[tokio::test]
506    async fn test_build_time_filter() {
507        let table_provider =
508            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
509        let session_state = SessionStateBuilder::new().with_default_features().build();
510        let planner = LogQueryPlanner::new(table_provider, session_state);
511
512        let time_filter = TimeFilter {
513            start: Some("2021-01-01T00:00:00Z".to_string()),
514            end: Some("2021-01-02T00:00:00Z".to_string()),
515            span: None,
516        };
517
518        let expr = planner
519            .build_time_filter(&time_filter, &mock_schema())
520            .unwrap();
521
522        let expected_expr = col("timestamp")
523            .gt_eq(lit(ScalarValue::Utf8(Some(
524                "2021-01-01T00:00:00Z".to_string(),
525            ))))
526            .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
527                "2021-01-02T00:00:00Z".to_string(),
528            )))));
529
530        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
531    }
532
533    #[tokio::test]
534    async fn test_build_time_filter_without_end() {
535        let table_provider =
536            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
537        let session_state = SessionStateBuilder::new().with_default_features().build();
538        let planner = LogQueryPlanner::new(table_provider, session_state);
539
540        let time_filter = TimeFilter {
541            start: Some("2021-01-01T00:00:00Z".to_string()),
542            end: None,
543            span: None,
544        };
545
546        let expr = planner
547            .build_time_filter(&time_filter, &mock_schema())
548            .unwrap();
549
550        let expected_expr = col("timestamp")
551            .gt_eq(lit(ScalarValue::Utf8(Some(
552                "2021-01-01T00:00:00Z".to_string(),
553            ))))
554            .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
555                "9999-12-31T23:59:59Z".to_string(),
556            )))));
557
558        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
559    }
560
561    #[tokio::test]
562    async fn test_build_column_filter() {
563        let table_provider =
564            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
565        let session_state = SessionStateBuilder::new().with_default_features().build();
566        let planner = LogQueryPlanner::new(table_provider, session_state);
567
568        let column_filter = ColumnFilters {
569            column_name: "message".to_string(),
570            filters: vec![
571                ContentFilter::Contains("error".to_string()),
572                ContentFilter::Prefix("WARN".to_string()),
573            ],
574        };
575
576        let expr_option = planner.build_column_filter(&column_filter).unwrap();
577        assert!(expr_option.is_some());
578
579        let expr = expr_option.unwrap();
580
581        let expected_expr = col("message")
582            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
583            .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
584
585        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
586    }
587
588    #[tokio::test]
589    async fn test_query_to_plan_with_only_skip() {
590        let table_provider =
591            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
592        let session_state = SessionStateBuilder::new().with_default_features().build();
593        let mut planner = LogQueryPlanner::new(table_provider, session_state);
594
595        let log_query = LogQuery {
596            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
597            time_filter: TimeFilter {
598                start: Some("2021-01-01T00:00:00Z".to_string()),
599                end: Some("2021-01-02T00:00:00Z".to_string()),
600                span: None,
601            },
602            filters: vec![ColumnFilters {
603                column_name: "message".to_string(),
604                filters: vec![ContentFilter::Contains("error".to_string())],
605            }],
606            limit: Limit {
607                skip: Some(10),
608                fetch: None,
609            },
610            context: Context::None,
611            columns: vec![],
612            exprs: vec![],
613        };
614
615        let plan = planner.query_to_plan(log_query).await.unwrap();
616        let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
617\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
618\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
619
620        assert_eq!(plan.display_indent_schema().to_string(), expected);
621    }
622
623    #[tokio::test]
624    async fn test_query_to_plan_without_limit() {
625        let table_provider =
626            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
627        let session_state = SessionStateBuilder::new().with_default_features().build();
628        let mut planner = LogQueryPlanner::new(table_provider, session_state);
629
630        let log_query = LogQuery {
631            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
632            time_filter: TimeFilter {
633                start: Some("2021-01-01T00:00:00Z".to_string()),
634                end: Some("2021-01-02T00:00:00Z".to_string()),
635                span: None,
636            },
637            filters: vec![ColumnFilters {
638                column_name: "message".to_string(),
639                filters: vec![ContentFilter::Contains("error".to_string())],
640            }],
641            limit: Limit {
642                skip: None,
643                fetch: None,
644            },
645            context: Context::None,
646            columns: vec![],
647            exprs: vec![],
648        };
649
650        let plan = planner.query_to_plan(log_query).await.unwrap();
651        let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
652\n  Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
653\n    TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
654
655        assert_eq!(plan.display_indent_schema().to_string(), expected);
656    }
657
658    #[test]
659    fn test_escape_pattern() {
660        assert_eq!(escape_like_pattern("test"), "test");
661        assert_eq!(escape_like_pattern("te%st"), "te\\%st");
662        assert_eq!(escape_like_pattern("te_st"), "te\\_st");
663        assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
664    }
665
666    #[tokio::test]
667    async fn test_query_to_plan_with_aggr_func() {
668        let table_provider =
669            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
670        let session_state = SessionStateBuilder::new().with_default_features().build();
671        let mut planner = LogQueryPlanner::new(table_provider, session_state);
672
673        let log_query = LogQuery {
674            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
675            time_filter: TimeFilter {
676                start: Some("2021-01-01T00:00:00Z".to_string()),
677                end: Some("2021-01-02T00:00:00Z".to_string()),
678                span: None,
679            },
680            filters: vec![],
681            limit: Limit {
682                skip: None,
683                fetch: Some(100),
684            },
685            context: Context::None,
686            columns: vec![],
687            exprs: vec![LogExpr::AggrFunc {
688                name: "count".to_string(),
689                args: vec![LogExpr::NamedIdent("message".to_string())],
690                by: vec![LogExpr::NamedIdent("host".to_string())],
691                range: None,
692                alias: Some("count_result".to_string()),
693            }],
694        };
695
696        let plan = planner.query_to_plan(log_query).await.unwrap();
697        let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
698\n  Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
699\n    Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
700\n      TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
701
702        assert_eq!(plan.display_indent_schema().to_string(), expected);
703    }
704
705    #[tokio::test]
706    async fn test_query_to_plan_with_scalar_func() {
707        let table_provider =
708            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
709        let session_state = SessionStateBuilder::new().with_default_features().build();
710        let mut planner = LogQueryPlanner::new(table_provider, session_state);
711
712        let log_query = LogQuery {
713            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
714            time_filter: TimeFilter {
715                start: Some("2021-01-01T00:00:00Z".to_string()),
716                end: Some("2021-01-02T00:00:00Z".to_string()),
717                span: None,
718            },
719            filters: vec![],
720            limit: Limit {
721                skip: None,
722                fetch: Some(100),
723            },
724            context: Context::None,
725            columns: vec![],
726            exprs: vec![LogExpr::ScalarFunc {
727                name: "date_trunc".to_string(),
728                args: vec![
729                    LogExpr::NamedIdent("timestamp".to_string()),
730                    LogExpr::Literal("day".to_string()),
731                ],
732                alias: Some("time_bucket".to_string()),
733            }],
734        };
735
736        let plan = planner.query_to_plan(log_query).await.unwrap();
737        let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
738        \n  Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
739        \n    Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
740        \n      TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
741
742        assert_eq!(plan.display_indent_schema().to_string(), expected);
743    }
744
745    #[tokio::test]
746    async fn test_build_column_filter_between() {
747        let table_provider =
748            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
749        let session_state = SessionStateBuilder::new().with_default_features().build();
750        let planner = LogQueryPlanner::new(table_provider, session_state);
751
752        let column_filter = ColumnFilters {
753            column_name: "message".to_string(),
754            filters: vec![ContentFilter::Between {
755                start: "a".to_string(),
756                end: "z".to_string(),
757                start_inclusive: true,
758                end_inclusive: false,
759            }],
760        };
761
762        let expr_option = planner.build_column_filter(&column_filter).unwrap();
763        assert!(expr_option.is_some());
764
765        let expr = expr_option.unwrap();
766        let expected_expr = col("message")
767            .gt_eq(lit(ScalarValue::Utf8(Some("a".to_string()))))
768            .and(col("message").lt(lit(ScalarValue::Utf8(Some("z".to_string())))));
769
770        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
771    }
772
773    #[tokio::test]
774    async fn test_query_to_plan_with_date_histogram() {
775        let table_provider =
776            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
777        let session_state = SessionStateBuilder::new().with_default_features().build();
778        let mut planner = LogQueryPlanner::new(table_provider, session_state);
779
780        let log_query = LogQuery {
781            table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
782            time_filter: TimeFilter {
783                start: Some("2021-01-01T00:00:00Z".to_string()),
784                end: Some("2021-01-02T00:00:00Z".to_string()),
785                span: None,
786            },
787            filters: vec![],
788            limit: Limit {
789                skip: Some(0),
790                fetch: None,
791            },
792            context: Context::None,
793            columns: vec![],
794            exprs: vec![
795                LogExpr::ScalarFunc {
796                    name: "date_bin".to_string(),
797                    args: vec![
798                        LogExpr::Literal("30 seconds".to_string()),
799                        LogExpr::NamedIdent("timestamp".to_string()),
800                    ],
801                    alias: Some("2__date_histogram__time_bucket".to_string()),
802                },
803                LogExpr::AggrFunc {
804                    name: "count".to_string(),
805                    args: vec![LogExpr::PositionalIdent(0)],
806                    by: vec![LogExpr::NamedIdent(
807                        "2__date_histogram__time_bucket".to_string(),
808                    )],
809                    range: None,
810                    alias: Some("count_result".to_string()),
811                },
812            ],
813        };
814
815        let plan = planner.query_to_plan(log_query).await.unwrap();
816        let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
817\n  Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
818\n    Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
819\n      Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
820\n        TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
821
822        assert_eq!(plan.display_indent_schema().to_string(), expected);
823    }
824
825    #[tokio::test]
826    async fn test_build_compound_filter() {
827        let table_provider =
828            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
829        let session_state = SessionStateBuilder::new().with_default_features().build();
830        let planner = LogQueryPlanner::new(table_provider, session_state);
831
832        // Test AND compound
833        let filter = ContentFilter::Compound(
834            vec![
835                ContentFilter::Contains("error".to_string()),
836                ContentFilter::Prefix("WARN".to_string()),
837            ],
838            BinaryOperator::And,
839        );
840        let expr = planner.build_content_filter("message", &filter).unwrap();
841
842        let expected_expr = col("message")
843            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
844            .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
845
846        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
847
848        // Test OR compound
849        let filter = ContentFilter::Compound(
850            vec![
851                ContentFilter::Contains("error".to_string()),
852                ContentFilter::Prefix("WARN".to_string()),
853            ],
854            BinaryOperator::Or,
855        );
856        let expr = planner.build_content_filter("message", &filter).unwrap();
857
858        let expected_expr = col("message")
859            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
860            .or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
861
862        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
863
864        // Test nested compound
865        let filter = ContentFilter::Compound(
866            vec![
867                ContentFilter::Contains("error".to_string()),
868                ContentFilter::Compound(
869                    vec![
870                        ContentFilter::Prefix("WARN".to_string()),
871                        ContentFilter::Exact("DEBUG".to_string()),
872                    ],
873                    BinaryOperator::Or,
874                ),
875            ],
876            BinaryOperator::And,
877        );
878        let expr = planner.build_content_filter("message", &filter).unwrap();
879
880        let expected_nested = col("message")
881            .like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
882            .or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string())))));
883        let expected_expr = col("message")
884            .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
885            .and(expected_nested);
886
887        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
888    }
889
890    #[tokio::test]
891    async fn test_build_great_than_filter() {
892        let table_provider =
893            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
894        let session_state = SessionStateBuilder::new().with_default_features().build();
895        let planner = LogQueryPlanner::new(table_provider, session_state);
896
897        // Test GreatThan with inclusive=true
898        let column_filter = ColumnFilters {
899            column_name: "message".to_string(),
900            filters: vec![ContentFilter::GreatThan {
901                value: "error".to_string(),
902                inclusive: true,
903            }],
904        };
905
906        let expr_option = planner.build_column_filter(&column_filter).unwrap();
907        assert!(expr_option.is_some());
908
909        let expr = expr_option.unwrap();
910        let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
911
912        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
913
914        // Test GreatThan with inclusive=false
915        let column_filter = ColumnFilters {
916            column_name: "message".to_string(),
917            filters: vec![ContentFilter::GreatThan {
918                value: "error".to_string(),
919                inclusive: false,
920            }],
921        };
922
923        let expr_option = planner.build_column_filter(&column_filter).unwrap();
924        assert!(expr_option.is_some());
925
926        let expr = expr_option.unwrap();
927        let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string()))));
928
929        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
930    }
931
932    #[tokio::test]
933    async fn test_build_less_than_filter() {
934        let table_provider =
935            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
936        let session_state = SessionStateBuilder::new().with_default_features().build();
937        let planner = LogQueryPlanner::new(table_provider, session_state);
938
939        // Test LessThan with inclusive=true
940        let column_filter = ColumnFilters {
941            column_name: "message".to_string(),
942            filters: vec![ContentFilter::LessThan {
943                value: "error".to_string(),
944                inclusive: true,
945            }],
946        };
947
948        let expr_option = planner.build_column_filter(&column_filter).unwrap();
949        assert!(expr_option.is_some());
950
951        let expr = expr_option.unwrap();
952        let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
953
954        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
955
956        // Test LessThan with inclusive=false
957        let column_filter = ColumnFilters {
958            column_name: "message".to_string(),
959            filters: vec![ContentFilter::LessThan {
960                value: "error".to_string(),
961                inclusive: false,
962            }],
963        };
964
965        let expr_option = planner.build_column_filter(&column_filter).unwrap();
966        assert!(expr_option.is_some());
967
968        let expr = expr_option.unwrap();
969        let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string()))));
970
971        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
972    }
973
974    #[tokio::test]
975    async fn test_build_in_filter() {
976        let table_provider =
977            build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
978        let session_state = SessionStateBuilder::new().with_default_features().build();
979        let planner = LogQueryPlanner::new(table_provider, session_state);
980
981        // Test In filter with multiple values
982        let column_filter = ColumnFilters {
983            column_name: "message".to_string(),
984            filters: vec![ContentFilter::In(vec![
985                "error".to_string(),
986                "warning".to_string(),
987                "info".to_string(),
988            ])],
989        };
990
991        let expr_option = planner.build_column_filter(&column_filter).unwrap();
992        assert!(expr_option.is_some());
993
994        let expr = expr_option.unwrap();
995        let expected_expr = col("message").in_list(
996            vec![
997                lit(ScalarValue::Utf8(Some("error".to_string()))),
998                lit(ScalarValue::Utf8(Some("warning".to_string()))),
999                lit(ScalarValue::Utf8(Some("info".to_string()))),
1000            ],
1001            false,
1002        );
1003
1004        assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1005    }
1006}