operator/statement/
tql.rs1use std::collections::HashMap;
16
17use common_query::Output;
18use common_telemetry::tracing;
19use datafusion_expr::LogicalPlan;
20use query::parser::{
21 PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
22 DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
23};
24use session::context::QueryContextRef;
25use snafu::ResultExt;
26use sql::statements::tql::Tql;
27
28use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Result};
29use crate::statement::StatementExecutor;
30
31impl StatementExecutor {
32 #[tracing::instrument(skip_all)]
34 pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
35 let stmt = match tql {
36 Tql::Eval(eval) => {
37 let promql = PromQuery {
38 start: eval.start,
39 end: eval.end,
40 step: eval.step,
41 query: eval.query,
42 lookback: eval
43 .lookback
44 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
45 };
46 QueryLanguageParser::parse_promql(&promql, query_ctx).context(ParseQuerySnafu)?
47 }
48 Tql::Explain(explain) => {
49 let promql = PromQuery {
50 query: explain.query,
51 lookback: explain
52 .lookback
53 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
54 ..PromQuery::default()
55 };
56 let explain_node_name = if explain.is_verbose {
57 EXPLAIN_VERBOSE_NODE_NAME
58 } else {
59 EXPLAIN_NODE_NAME
60 }
61 .to_string();
62 let params = HashMap::from([("name".to_string(), explain_node_name)]);
63 QueryLanguageParser::parse_promql(&promql, query_ctx)
64 .context(ParseQuerySnafu)?
65 .post_process(params)
66 .unwrap()
67 }
68 Tql::Analyze(analyze) => {
69 let promql = PromQuery {
70 start: analyze.start,
71 end: analyze.end,
72 step: analyze.step,
73 query: analyze.query,
74 lookback: analyze
75 .lookback
76 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
77 };
78 let analyze_node_name = if analyze.is_verbose {
79 ANALYZE_VERBOSE_NODE_NAME
80 } else {
81 ANALYZE_NODE_NAME
82 }
83 .to_string();
84 let params = HashMap::from([("name".to_string(), analyze_node_name)]);
85 QueryLanguageParser::parse_promql(&promql, query_ctx)
86 .context(ParseQuerySnafu)?
87 .post_process(params)
88 .unwrap()
89 }
90 };
91 self.query_engine
92 .planner()
93 .plan(&stmt, query_ctx.clone())
94 .await
95 .context(PlanStatementSnafu)
96 }
97
98 #[tracing::instrument(skip_all)]
100 pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
101 let plan = self.plan_tql(tql, &query_ctx).await?;
102 self.query_engine
103 .execute(plan, query_ctx)
104 .await
105 .context(ExecLogicalPlanSnafu)
106 }
107}