1use catalog::table_source::DfTableSourceProvider;
16use common_function::utils::escape_like_pattern;
17use datafusion::datasource::DefaultTableSource;
18use datafusion::execution::SessionState;
19use datafusion_common::{DFSchema, ScalarValue};
20use datafusion_expr::utils::conjunction;
21use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
22use datafusion_sql::TableReference;
23use datatypes::schema::Schema;
24use log_query::{ColumnFilters, LogExpr, LogQuery, TimeFilter};
25use snafu::{OptionExt, ResultExt};
26use table::table::adapter::DfTableProviderAdapter;
27
28use crate::log_query::error::{
29 CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
30 UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
31 UnknownTableSnafu,
32};
33
34const DEFAULT_LIMIT: usize = 1000;
35
36pub struct LogQueryPlanner {
37 table_provider: DfTableSourceProvider,
38 session_state: SessionState,
39}
40
41impl LogQueryPlanner {
42 pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
43 Self {
44 table_provider,
45 session_state,
46 }
47 }
48
49 pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
50 let table_ref: TableReference = query.table.table_ref().into();
52 let table_source = self
53 .table_provider
54 .resolve_table(table_ref.clone())
55 .await
56 .context(CatalogSnafu)?;
57 let schema = table_source
58 .as_any()
59 .downcast_ref::<DefaultTableSource>()
60 .context(UnknownTableSnafu)?
61 .table_provider
62 .as_any()
63 .downcast_ref::<DfTableProviderAdapter>()
64 .context(UnknownTableSnafu)?
65 .table()
66 .schema();
67
68 let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
70 .context(DataFusionPlanningSnafu)?;
71
72 let mut filters = Vec::new();
74
75 filters.push(self.build_time_filter(&query.time_filter, &schema)?);
77
78 for column_filter in &query.filters {
80 if let Some(expr) = self.build_column_filter(column_filter)? {
81 filters.push(expr);
82 }
83 }
84
85 if !filters.is_empty() {
87 let filter_expr = filters.into_iter().reduce(|a, b| a.and(b)).unwrap();
88 plan_builder = plan_builder
89 .filter(filter_expr)
90 .context(DataFusionPlanningSnafu)?;
91 }
92
93 if !query.columns.is_empty() {
95 let projected_columns = query.columns.iter().map(col).collect::<Vec<_>>();
96 plan_builder = plan_builder
97 .project(projected_columns)
98 .context(DataFusionPlanningSnafu)?;
99 }
100
101 plan_builder = plan_builder
103 .limit(
104 query.limit.skip.unwrap_or(0),
105 Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
106 )
107 .context(DataFusionPlanningSnafu)?;
108
109 for expr in &query.exprs {
111 plan_builder = self.process_log_expr(plan_builder, expr)?;
112 }
113
114 let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
116
117 Ok(plan)
118 }
119
120 fn build_time_filter(&self, time_filter: &TimeFilter, schema: &Schema) -> Result<Expr> {
121 let timestamp_col = schema
122 .timestamp_column()
123 .with_context(|| TimeIndexNotFoundSnafu {})?
124 .name
125 .clone();
126
127 let start_time = ScalarValue::Utf8(time_filter.start.clone());
128 let end_time = ScalarValue::Utf8(
129 time_filter
130 .end
131 .clone()
132 .or(Some("9999-12-31T23:59:59Z".to_string())),
133 );
134 let expr = col(timestamp_col.clone())
135 .gt_eq(lit(start_time))
136 .and(col(timestamp_col).lt_eq(lit(end_time)));
137
138 Ok(expr)
139 }
140
141 fn build_column_filter(&self, column_filter: &ColumnFilters) -> Result<Option<Expr>> {
143 if column_filter.filters.is_empty() {
144 return Ok(None);
145 }
146
147 self.build_content_filters(&column_filter.column_name, &column_filter.filters)
148 }
149
150 fn build_content_filters(
152 &self,
153 column_name: &str,
154 filters: &[log_query::ContentFilter],
155 ) -> Result<Option<Expr>> {
156 if filters.is_empty() {
157 return Ok(None);
158 }
159
160 let exprs = filters
161 .iter()
162 .map(|filter| self.build_content_filter(column_name, filter))
163 .try_collect::<Vec<_>>()?;
164
165 Ok(conjunction(exprs))
166 }
167
168 #[allow(clippy::only_used_in_recursion)]
170 fn build_content_filter(
171 &self,
172 column_name: &str,
173 filter: &log_query::ContentFilter,
174 ) -> Result<Expr> {
175 match filter {
176 log_query::ContentFilter::Exact(pattern) => {
177 Ok(col(column_name)
178 .like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern))))))
179 }
180 log_query::ContentFilter::Prefix(pattern) => Ok(col(column_name).like(lit(
181 ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(pattern)))),
182 ))),
183 log_query::ContentFilter::Postfix(pattern) => Ok(col(column_name).like(lit(
184 ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(pattern)))),
185 ))),
186 log_query::ContentFilter::Contains(pattern) => Ok(col(column_name).like(lit(
187 ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(pattern)))),
188 ))),
189 log_query::ContentFilter::Regex(..) => Err::<Expr, _>(
190 UnimplementedSnafu {
191 feature: "regex filter",
192 }
193 .build(),
194 ),
195 log_query::ContentFilter::Exist => Ok(col(column_name).is_not_null()),
196 log_query::ContentFilter::Between {
197 start,
198 end,
199 start_inclusive,
200 end_inclusive,
201 } => {
202 let left = if *start_inclusive {
203 Expr::gt_eq
204 } else {
205 Expr::gt
206 };
207 let right = if *end_inclusive {
208 Expr::lt_eq
209 } else {
210 Expr::lt
211 };
212 Ok(left(
213 col(column_name),
214 lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))),
215 )
216 .and(right(
217 col(column_name),
218 lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))),
219 )))
220 }
221 log_query::ContentFilter::GreatThan { value, inclusive } => {
222 let expr = if *inclusive { Expr::gt_eq } else { Expr::gt };
223 Ok(expr(
224 col(column_name),
225 lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
226 ))
227 }
228 log_query::ContentFilter::LessThan { value, inclusive } => {
229 let expr = if *inclusive { Expr::lt_eq } else { Expr::lt };
230 Ok(expr(
231 col(column_name),
232 lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
233 ))
234 }
235 log_query::ContentFilter::In(values) => {
236 let list = values
237 .iter()
238 .map(|value| lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))))
239 .collect();
240 Ok(col(column_name).in_list(list, false))
241 }
242 log_query::ContentFilter::Compound(filters, op) => {
243 let exprs = filters
244 .iter()
245 .map(|filter| self.build_content_filter(column_name, filter))
246 .try_collect::<Vec<_>>()?;
247
248 match op {
249 log_query::BinaryOperator::And => Ok(conjunction(exprs).unwrap()),
250 log_query::BinaryOperator::Or => {
251 Ok(exprs.into_iter().reduce(|a, b| a.or(b)).unwrap())
253 }
254 }
255 }
256 }
257 }
258
259 fn build_aggr_func(
260 &self,
261 schema: &DFSchema,
262 fn_name: &str,
263 args: &[LogExpr],
264 by: &[LogExpr],
265 ) -> Result<(Expr, Vec<Expr>)> {
266 let aggr_fn = self
267 .session_state
268 .aggregate_functions()
269 .get(fn_name)
270 .context(UnknownAggregateFunctionSnafu {
271 name: fn_name.to_string(),
272 })?;
273 let args = args
274 .iter()
275 .map(|expr| self.log_expr_to_column_expr(expr, schema))
276 .try_collect::<Vec<_>>()?;
277 let group_exprs = by
278 .iter()
279 .map(|expr| self.log_expr_to_column_expr(expr, schema))
280 .try_collect::<Vec<_>>()?;
281 let aggr_expr = aggr_fn.call(args);
282
283 Ok((aggr_expr, group_exprs))
284 }
285
286 fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
291 match expr {
292 LogExpr::NamedIdent(name) => Ok(col(name)),
293 LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
294 LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
295 _ => UnexpectedLogExprSnafu {
296 expr: expr.clone(),
297 expected: "named identifier, positional identifier, or literal",
298 }
299 .fail(),
300 }
301 }
302
303 fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result<Expr> {
304 let args = args
305 .iter()
306 .map(|expr| self.log_expr_to_column_expr(expr, schema))
307 .try_collect::<Vec<_>>()?;
308 let func = self.session_state.scalar_functions().get(name).context(
309 UnknownScalarFunctionSnafu {
310 name: name.to_string(),
311 },
312 )?;
313 let expr = func.call(args);
314
315 Ok(expr)
316 }
317
318 fn process_log_expr(
322 &self,
323 plan_builder: LogicalPlanBuilder,
324 expr: &LogExpr,
325 ) -> Result<LogicalPlanBuilder> {
326 let mut plan_builder = plan_builder;
327
328 match expr {
329 LogExpr::AggrFunc {
330 name,
331 args,
332 by,
333 range: _range,
334 alias,
335 } => {
336 let schema = plan_builder.schema();
337 let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?;
338 if let Some(alias) = alias {
339 aggr_expr = aggr_expr.alias(alias);
340 }
341
342 plan_builder = plan_builder
343 .aggregate(group_exprs, [aggr_expr.clone()])
344 .context(DataFusionPlanningSnafu)?;
345 }
346 LogExpr::Filter { expr, filter } => {
347 let schema = plan_builder.schema();
348 let expr = self.log_expr_to_column_expr(expr, schema)?;
349
350 let col_name = expr.schema_name().to_string();
351 let filter_expr = self.build_content_filter(&col_name, filter)?;
352 plan_builder = plan_builder
353 .filter(filter_expr)
354 .context(DataFusionPlanningSnafu)?;
355 }
356 LogExpr::ScalarFunc { name, args, alias } => {
357 let schema = plan_builder.schema();
358 let mut expr = self.build_scalar_func(schema, name, args)?;
359 if let Some(alias) = alias {
360 expr = expr.alias(alias);
361 }
362 plan_builder = plan_builder
363 .project([expr.clone()])
364 .context(DataFusionPlanningSnafu)?;
365 }
366 LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
367 }
369 LogExpr::Alias { expr, alias } => {
370 let expr = self.log_expr_to_column_expr(expr, plan_builder.schema())?;
371 let aliased_expr = expr.alias(alias);
372 plan_builder = plan_builder
373 .project([aliased_expr.clone()])
374 .context(DataFusionPlanningSnafu)?;
375 }
376 _ => {
377 UnimplementedSnafu {
378 feature: "log expression",
379 }
380 .fail()?;
381 }
382 }
383 Ok(plan_builder)
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use std::sync::Arc;
390
391 use catalog::memory::MemoryCatalogManager;
392 use catalog::RegisterTableRequest;
393 use common_catalog::consts::DEFAULT_CATALOG_NAME;
394 use common_query::test_util::DummyDecoder;
395 use datafusion::execution::SessionStateBuilder;
396 use datatypes::prelude::ConcreteDataType;
397 use datatypes::schema::{ColumnSchema, SchemaRef};
398 use log_query::{BinaryOperator, ContentFilter, Context, Limit};
399 use session::context::QueryContext;
400 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
401 use table::table_name::TableName;
402 use table::test_util::EmptyTable;
403
404 use super::*;
405
406 fn mock_schema() -> SchemaRef {
407 let columns = vec![
408 ColumnSchema::new(
409 "message".to_string(),
410 ConcreteDataType::string_datatype(),
411 false,
412 ),
413 ColumnSchema::new(
414 "timestamp".to_string(),
415 ConcreteDataType::timestamp_millisecond_datatype(),
416 false,
417 )
418 .with_time_index(true),
419 ColumnSchema::new(
420 "host".to_string(),
421 ConcreteDataType::string_datatype(),
422 true,
423 ),
424 ];
425
426 Arc::new(Schema::new(columns))
427 }
428
429 async fn build_test_table_provider(
431 table_name_tuples: &[(String, String)],
432 ) -> DfTableSourceProvider {
433 let catalog_list = MemoryCatalogManager::with_default_setup();
434 for (schema_name, table_name) in table_name_tuples {
435 let schema = mock_schema();
436 let table_meta = TableMetaBuilder::empty()
437 .schema(schema)
438 .primary_key_indices(vec![2])
439 .value_indices(vec![0])
440 .next_column_id(1024)
441 .build()
442 .unwrap();
443 let table_info = TableInfoBuilder::default()
444 .name(table_name.to_string())
445 .meta(table_meta)
446 .build()
447 .unwrap();
448 let table = EmptyTable::from_table_info(&table_info);
449
450 catalog_list
451 .register_table_sync(RegisterTableRequest {
452 catalog: DEFAULT_CATALOG_NAME.to_string(),
453 schema: schema_name.to_string(),
454 table_name: table_name.to_string(),
455 table_id: 1024,
456 table,
457 })
458 .unwrap();
459 }
460
461 DfTableSourceProvider::new(
462 catalog_list,
463 false,
464 QueryContext::arc(),
465 DummyDecoder::arc(),
466 false,
467 )
468 }
469
470 #[tokio::test]
471 async fn test_query_to_plan() {
472 let table_provider =
473 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
474 let session_state = SessionStateBuilder::new().with_default_features().build();
475 let mut planner = LogQueryPlanner::new(table_provider, session_state);
476
477 let log_query = LogQuery {
478 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
479 time_filter: TimeFilter {
480 start: Some("2021-01-01T00:00:00Z".to_string()),
481 end: Some("2021-01-02T00:00:00Z".to_string()),
482 span: None,
483 },
484 filters: vec![ColumnFilters {
485 column_name: "message".to_string(),
486 filters: vec![ContentFilter::Contains("error".to_string())],
487 }],
488 limit: Limit {
489 skip: None,
490 fetch: Some(100),
491 },
492 context: Context::None,
493 columns: vec![],
494 exprs: vec![],
495 };
496
497 let plan = planner.query_to_plan(log_query).await.unwrap();
498 let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
499\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
500\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
501
502 assert_eq!(plan.display_indent_schema().to_string(), expected);
503 }
504
505 #[tokio::test]
506 async fn test_build_time_filter() {
507 let table_provider =
508 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
509 let session_state = SessionStateBuilder::new().with_default_features().build();
510 let planner = LogQueryPlanner::new(table_provider, session_state);
511
512 let time_filter = TimeFilter {
513 start: Some("2021-01-01T00:00:00Z".to_string()),
514 end: Some("2021-01-02T00:00:00Z".to_string()),
515 span: None,
516 };
517
518 let expr = planner
519 .build_time_filter(&time_filter, &mock_schema())
520 .unwrap();
521
522 let expected_expr = col("timestamp")
523 .gt_eq(lit(ScalarValue::Utf8(Some(
524 "2021-01-01T00:00:00Z".to_string(),
525 ))))
526 .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
527 "2021-01-02T00:00:00Z".to_string(),
528 )))));
529
530 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
531 }
532
533 #[tokio::test]
534 async fn test_build_time_filter_without_end() {
535 let table_provider =
536 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
537 let session_state = SessionStateBuilder::new().with_default_features().build();
538 let planner = LogQueryPlanner::new(table_provider, session_state);
539
540 let time_filter = TimeFilter {
541 start: Some("2021-01-01T00:00:00Z".to_string()),
542 end: None,
543 span: None,
544 };
545
546 let expr = planner
547 .build_time_filter(&time_filter, &mock_schema())
548 .unwrap();
549
550 let expected_expr = col("timestamp")
551 .gt_eq(lit(ScalarValue::Utf8(Some(
552 "2021-01-01T00:00:00Z".to_string(),
553 ))))
554 .and(col("timestamp").lt_eq(lit(ScalarValue::Utf8(Some(
555 "9999-12-31T23:59:59Z".to_string(),
556 )))));
557
558 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
559 }
560
561 #[tokio::test]
562 async fn test_build_column_filter() {
563 let table_provider =
564 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
565 let session_state = SessionStateBuilder::new().with_default_features().build();
566 let planner = LogQueryPlanner::new(table_provider, session_state);
567
568 let column_filter = ColumnFilters {
569 column_name: "message".to_string(),
570 filters: vec![
571 ContentFilter::Contains("error".to_string()),
572 ContentFilter::Prefix("WARN".to_string()),
573 ],
574 };
575
576 let expr_option = planner.build_column_filter(&column_filter).unwrap();
577 assert!(expr_option.is_some());
578
579 let expr = expr_option.unwrap();
580
581 let expected_expr = col("message")
582 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
583 .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
584
585 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
586 }
587
588 #[tokio::test]
589 async fn test_query_to_plan_with_only_skip() {
590 let table_provider =
591 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
592 let session_state = SessionStateBuilder::new().with_default_features().build();
593 let mut planner = LogQueryPlanner::new(table_provider, session_state);
594
595 let log_query = LogQuery {
596 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
597 time_filter: TimeFilter {
598 start: Some("2021-01-01T00:00:00Z".to_string()),
599 end: Some("2021-01-02T00:00:00Z".to_string()),
600 span: None,
601 },
602 filters: vec![ColumnFilters {
603 column_name: "message".to_string(),
604 filters: vec![ContentFilter::Contains("error".to_string())],
605 }],
606 limit: Limit {
607 skip: Some(10),
608 fetch: None,
609 },
610 context: Context::None,
611 columns: vec![],
612 exprs: vec![],
613 };
614
615 let plan = planner.query_to_plan(log_query).await.unwrap();
616 let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
617\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
618\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
619
620 assert_eq!(plan.display_indent_schema().to_string(), expected);
621 }
622
623 #[tokio::test]
624 async fn test_query_to_plan_without_limit() {
625 let table_provider =
626 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
627 let session_state = SessionStateBuilder::new().with_default_features().build();
628 let mut planner = LogQueryPlanner::new(table_provider, session_state);
629
630 let log_query = LogQuery {
631 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
632 time_filter: TimeFilter {
633 start: Some("2021-01-01T00:00:00Z".to_string()),
634 end: Some("2021-01-02T00:00:00Z".to_string()),
635 span: None,
636 },
637 filters: vec![ColumnFilters {
638 column_name: "message".to_string(),
639 filters: vec![ContentFilter::Contains("error".to_string())],
640 }],
641 limit: Limit {
642 skip: None,
643 fetch: None,
644 },
645 context: Context::None,
646 columns: vec![],
647 exprs: vec![],
648 };
649
650 let plan = planner.query_to_plan(log_query).await.unwrap();
651 let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
652\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
653\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
654
655 assert_eq!(plan.display_indent_schema().to_string(), expected);
656 }
657
658 #[test]
659 fn test_escape_pattern() {
660 assert_eq!(escape_like_pattern("test"), "test");
661 assert_eq!(escape_like_pattern("te%st"), "te\\%st");
662 assert_eq!(escape_like_pattern("te_st"), "te\\_st");
663 assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
664 }
665
666 #[tokio::test]
667 async fn test_query_to_plan_with_aggr_func() {
668 let table_provider =
669 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
670 let session_state = SessionStateBuilder::new().with_default_features().build();
671 let mut planner = LogQueryPlanner::new(table_provider, session_state);
672
673 let log_query = LogQuery {
674 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
675 time_filter: TimeFilter {
676 start: Some("2021-01-01T00:00:00Z".to_string()),
677 end: Some("2021-01-02T00:00:00Z".to_string()),
678 span: None,
679 },
680 filters: vec![],
681 limit: Limit {
682 skip: None,
683 fetch: Some(100),
684 },
685 context: Context::None,
686 columns: vec![],
687 exprs: vec![LogExpr::AggrFunc {
688 name: "count".to_string(),
689 args: vec![LogExpr::NamedIdent("message".to_string())],
690 by: vec![LogExpr::NamedIdent("host".to_string())],
691 range: None,
692 alias: Some("count_result".to_string()),
693 }],
694 };
695
696 let plan = planner.query_to_plan(log_query).await.unwrap();
697 let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
698\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
699\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
700\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
701
702 assert_eq!(plan.display_indent_schema().to_string(), expected);
703 }
704
705 #[tokio::test]
706 async fn test_query_to_plan_with_scalar_func() {
707 let table_provider =
708 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
709 let session_state = SessionStateBuilder::new().with_default_features().build();
710 let mut planner = LogQueryPlanner::new(table_provider, session_state);
711
712 let log_query = LogQuery {
713 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
714 time_filter: TimeFilter {
715 start: Some("2021-01-01T00:00:00Z".to_string()),
716 end: Some("2021-01-02T00:00:00Z".to_string()),
717 span: None,
718 },
719 filters: vec![],
720 limit: Limit {
721 skip: None,
722 fetch: Some(100),
723 },
724 context: Context::None,
725 columns: vec![],
726 exprs: vec![LogExpr::ScalarFunc {
727 name: "date_trunc".to_string(),
728 args: vec![
729 LogExpr::NamedIdent("timestamp".to_string()),
730 LogExpr::Literal("day".to_string()),
731 ],
732 alias: Some("time_bucket".to_string()),
733 }],
734 };
735
736 let plan = planner.query_to_plan(log_query).await.unwrap();
737 let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
738 \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
739 \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
740 \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
741
742 assert_eq!(plan.display_indent_schema().to_string(), expected);
743 }
744
745 #[tokio::test]
746 async fn test_build_column_filter_between() {
747 let table_provider =
748 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
749 let session_state = SessionStateBuilder::new().with_default_features().build();
750 let planner = LogQueryPlanner::new(table_provider, session_state);
751
752 let column_filter = ColumnFilters {
753 column_name: "message".to_string(),
754 filters: vec![ContentFilter::Between {
755 start: "a".to_string(),
756 end: "z".to_string(),
757 start_inclusive: true,
758 end_inclusive: false,
759 }],
760 };
761
762 let expr_option = planner.build_column_filter(&column_filter).unwrap();
763 assert!(expr_option.is_some());
764
765 let expr = expr_option.unwrap();
766 let expected_expr = col("message")
767 .gt_eq(lit(ScalarValue::Utf8(Some("a".to_string()))))
768 .and(col("message").lt(lit(ScalarValue::Utf8(Some("z".to_string())))));
769
770 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
771 }
772
773 #[tokio::test]
774 async fn test_query_to_plan_with_date_histogram() {
775 let table_provider =
776 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
777 let session_state = SessionStateBuilder::new().with_default_features().build();
778 let mut planner = LogQueryPlanner::new(table_provider, session_state);
779
780 let log_query = LogQuery {
781 table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
782 time_filter: TimeFilter {
783 start: Some("2021-01-01T00:00:00Z".to_string()),
784 end: Some("2021-01-02T00:00:00Z".to_string()),
785 span: None,
786 },
787 filters: vec![],
788 limit: Limit {
789 skip: Some(0),
790 fetch: None,
791 },
792 context: Context::None,
793 columns: vec![],
794 exprs: vec![
795 LogExpr::ScalarFunc {
796 name: "date_bin".to_string(),
797 args: vec![
798 LogExpr::Literal("30 seconds".to_string()),
799 LogExpr::NamedIdent("timestamp".to_string()),
800 ],
801 alias: Some("2__date_histogram__time_bucket".to_string()),
802 },
803 LogExpr::AggrFunc {
804 name: "count".to_string(),
805 args: vec![LogExpr::PositionalIdent(0)],
806 by: vec![LogExpr::NamedIdent(
807 "2__date_histogram__time_bucket".to_string(),
808 )],
809 range: None,
810 alias: Some("count_result".to_string()),
811 },
812 ],
813 };
814
815 let plan = planner.query_to_plan(log_query).await.unwrap();
816 let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
817\n Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
818\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
819\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
820\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
821
822 assert_eq!(plan.display_indent_schema().to_string(), expected);
823 }
824
825 #[tokio::test]
826 async fn test_build_compound_filter() {
827 let table_provider =
828 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
829 let session_state = SessionStateBuilder::new().with_default_features().build();
830 let planner = LogQueryPlanner::new(table_provider, session_state);
831
832 let filter = ContentFilter::Compound(
834 vec![
835 ContentFilter::Contains("error".to_string()),
836 ContentFilter::Prefix("WARN".to_string()),
837 ],
838 BinaryOperator::And,
839 );
840 let expr = planner.build_content_filter("message", &filter).unwrap();
841
842 let expected_expr = col("message")
843 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
844 .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
845
846 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
847
848 let filter = ContentFilter::Compound(
850 vec![
851 ContentFilter::Contains("error".to_string()),
852 ContentFilter::Prefix("WARN".to_string()),
853 ],
854 BinaryOperator::Or,
855 );
856 let expr = planner.build_content_filter("message", &filter).unwrap();
857
858 let expected_expr = col("message")
859 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
860 .or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
861
862 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
863
864 let filter = ContentFilter::Compound(
866 vec![
867 ContentFilter::Contains("error".to_string()),
868 ContentFilter::Compound(
869 vec![
870 ContentFilter::Prefix("WARN".to_string()),
871 ContentFilter::Exact("DEBUG".to_string()),
872 ],
873 BinaryOperator::Or,
874 ),
875 ],
876 BinaryOperator::And,
877 );
878 let expr = planner.build_content_filter("message", &filter).unwrap();
879
880 let expected_nested = col("message")
881 .like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
882 .or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string())))));
883 let expected_expr = col("message")
884 .like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
885 .and(expected_nested);
886
887 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
888 }
889
890 #[tokio::test]
891 async fn test_build_great_than_filter() {
892 let table_provider =
893 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
894 let session_state = SessionStateBuilder::new().with_default_features().build();
895 let planner = LogQueryPlanner::new(table_provider, session_state);
896
897 let column_filter = ColumnFilters {
899 column_name: "message".to_string(),
900 filters: vec![ContentFilter::GreatThan {
901 value: "error".to_string(),
902 inclusive: true,
903 }],
904 };
905
906 let expr_option = planner.build_column_filter(&column_filter).unwrap();
907 assert!(expr_option.is_some());
908
909 let expr = expr_option.unwrap();
910 let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
911
912 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
913
914 let column_filter = ColumnFilters {
916 column_name: "message".to_string(),
917 filters: vec![ContentFilter::GreatThan {
918 value: "error".to_string(),
919 inclusive: false,
920 }],
921 };
922
923 let expr_option = planner.build_column_filter(&column_filter).unwrap();
924 assert!(expr_option.is_some());
925
926 let expr = expr_option.unwrap();
927 let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string()))));
928
929 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
930 }
931
932 #[tokio::test]
933 async fn test_build_less_than_filter() {
934 let table_provider =
935 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
936 let session_state = SessionStateBuilder::new().with_default_features().build();
937 let planner = LogQueryPlanner::new(table_provider, session_state);
938
939 let column_filter = ColumnFilters {
941 column_name: "message".to_string(),
942 filters: vec![ContentFilter::LessThan {
943 value: "error".to_string(),
944 inclusive: true,
945 }],
946 };
947
948 let expr_option = planner.build_column_filter(&column_filter).unwrap();
949 assert!(expr_option.is_some());
950
951 let expr = expr_option.unwrap();
952 let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
953
954 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
955
956 let column_filter = ColumnFilters {
958 column_name: "message".to_string(),
959 filters: vec![ContentFilter::LessThan {
960 value: "error".to_string(),
961 inclusive: false,
962 }],
963 };
964
965 let expr_option = planner.build_column_filter(&column_filter).unwrap();
966 assert!(expr_option.is_some());
967
968 let expr = expr_option.unwrap();
969 let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string()))));
970
971 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
972 }
973
974 #[tokio::test]
975 async fn test_build_in_filter() {
976 let table_provider =
977 build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
978 let session_state = SessionStateBuilder::new().with_default_features().build();
979 let planner = LogQueryPlanner::new(table_provider, session_state);
980
981 let column_filter = ColumnFilters {
983 column_name: "message".to_string(),
984 filters: vec![ContentFilter::In(vec![
985 "error".to_string(),
986 "warning".to_string(),
987 "info".to_string(),
988 ])],
989 };
990
991 let expr_option = planner.build_column_filter(&column_filter).unwrap();
992 assert!(expr_option.is_some());
993
994 let expr = expr_option.unwrap();
995 let expected_expr = col("message").in_list(
996 vec![
997 lit(ScalarValue::Utf8(Some("error".to_string()))),
998 lit(ScalarValue::Utf8(Some("warning".to_string()))),
999 lit(ScalarValue::Utf8(Some("info".to_string()))),
1000 ],
1001 false,
1002 );
1003
1004 assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
1005 }
1006}