operator/statement/
tql.rs1use std::collections::HashMap;
16
17use common_query::Output;
18use common_telemetry::tracing;
19use datafusion_expr::LogicalPlan;
20use query::parser::{
21 ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME,
22 EXPLAIN_VERBOSE_NODE_NAME, PromQuery, QueryLanguageParser,
23};
24use session::context::QueryContextRef;
25use snafu::ResultExt;
26use sql::statements::tql::Tql;
27
28use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Result};
29use crate::statement::StatementExecutor;
30
31impl StatementExecutor {
32 #[tracing::instrument(skip_all)]
34 pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
35 let stmt = match tql {
36 Tql::Eval(eval) => {
37 let promql = PromQuery {
38 start: eval.start,
39 end: eval.end,
40 step: eval.step,
41 query: eval.query,
42 lookback: eval
43 .lookback
44 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
45 alias: eval.alias,
46 };
47 QueryLanguageParser::parse_promql(&promql, query_ctx).context(ParseQuerySnafu)?
48 }
49 Tql::Explain(explain) => {
50 if let Some(format) = &explain.format {
51 query_ctx.set_explain_format(format.to_string());
52 }
53
54 let promql = PromQuery {
55 query: explain.query,
56 lookback: explain
57 .lookback
58 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
59 ..PromQuery::default()
60 };
61 let explain_node_name = if explain.is_verbose {
62 EXPLAIN_VERBOSE_NODE_NAME
63 } else {
64 EXPLAIN_NODE_NAME
65 }
66 .to_string();
67 let params = HashMap::from([("name".to_string(), explain_node_name)]);
68 QueryLanguageParser::parse_promql(&promql, query_ctx)
69 .context(ParseQuerySnafu)?
70 .post_process(params)
71 .unwrap()
72 }
73 Tql::Analyze(analyze) => {
74 if let Some(format) = &analyze.format {
75 query_ctx.set_explain_format(format.to_string());
76 }
77
78 let promql = PromQuery {
79 start: analyze.start,
80 end: analyze.end,
81 step: analyze.step,
82 query: analyze.query,
83 lookback: analyze
84 .lookback
85 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
86 alias: analyze.alias,
87 };
88 let analyze_node_name = if analyze.is_verbose {
89 ANALYZE_VERBOSE_NODE_NAME
90 } else {
91 ANALYZE_NODE_NAME
92 }
93 .to_string();
94 let params = HashMap::from([("name".to_string(), analyze_node_name)]);
95 QueryLanguageParser::parse_promql(&promql, query_ctx)
96 .context(ParseQuerySnafu)?
97 .post_process(params)
98 .unwrap()
99 }
100 };
101 self.query_engine
102 .planner()
103 .plan(&stmt, query_ctx.clone())
104 .await
105 .context(PlanStatementSnafu)
106 }
107
108 #[tracing::instrument(skip_all)]
110 pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
111 let plan = self.plan_tql(tql, &query_ctx).await?;
112 self.query_engine
113 .execute(plan, query_ctx)
114 .await
115 .context(ExecLogicalPlanSnafu)
116 }
117}