operator/statement/
tql.rs1use std::collections::HashMap;
16
17use common_query::Output;
18use common_telemetry::tracing;
19use datafusion_expr::LogicalPlan;
20use query::parser::{
21 ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME,
22 EXPLAIN_VERBOSE_NODE_NAME, PromQuery, QueryLanguageParser,
23};
24use session::context::QueryContextRef;
25use snafu::ResultExt;
26use sql::statements::tql::Tql;
27
28use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Result};
29use crate::statement::StatementExecutor;
30
31impl StatementExecutor {
32 #[tracing::instrument(skip_all)]
34 pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
35 let stmt = match tql {
36 Tql::Eval(eval) => {
37 let promql = PromQuery {
38 start: eval.start,
39 end: eval.end,
40 step: eval.step,
41 query: eval.query,
42 lookback: eval
43 .lookback
44 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
45 alias: eval.alias,
46 };
47 QueryLanguageParser::parse_promql(&promql, query_ctx).context(ParseQuerySnafu)?
48 }
49 Tql::Explain(explain) => {
50 if let Some(format) = &explain.format {
51 query_ctx.set_explain_format(format.to_string());
52 }
53
54 let promql = PromQuery {
55 query: explain.query,
56 start: explain.start,
57 end: explain.end,
58 step: explain.step,
59 lookback: explain
60 .lookback
61 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
62 alias: explain.alias,
63 };
64 let explain_node_name = if explain.is_verbose {
65 EXPLAIN_VERBOSE_NODE_NAME
66 } else {
67 EXPLAIN_NODE_NAME
68 }
69 .to_string();
70 let params = HashMap::from([("name".to_string(), explain_node_name)]);
71 QueryLanguageParser::parse_promql(&promql, query_ctx)
72 .context(ParseQuerySnafu)?
73 .post_process(params)
74 .context(ParseQuerySnafu)?
75 }
76 Tql::Analyze(analyze) => {
77 if let Some(format) = &analyze.format {
78 query_ctx.set_explain_format(format.to_string());
79 }
80
81 let promql = PromQuery {
82 start: analyze.start,
83 end: analyze.end,
84 step: analyze.step,
85 query: analyze.query,
86 lookback: analyze
87 .lookback
88 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
89 alias: analyze.alias,
90 };
91 let analyze_node_name = if analyze.is_verbose {
92 ANALYZE_VERBOSE_NODE_NAME
93 } else {
94 ANALYZE_NODE_NAME
95 }
96 .to_string();
97 let params = HashMap::from([("name".to_string(), analyze_node_name)]);
98 QueryLanguageParser::parse_promql(&promql, query_ctx)
99 .context(ParseQuerySnafu)?
100 .post_process(params)
101 .context(ParseQuerySnafu)?
102 }
103 };
104 self.query_engine
105 .planner()
106 .plan(&stmt, query_ctx.clone())
107 .await
108 .context(PlanStatementSnafu)
109 }
110
111 #[tracing::instrument(skip_all)]
113 pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
114 let plan = self.plan_tql(tql, &query_ctx).await?;
115 self.query_engine
116 .execute(plan, query_ctx)
117 .await
118 .context(ExecLogicalPlanSnafu)
119 }
120}