operator/statement/
tql.rs1use std::collections::HashMap;
16
17use common_query::Output;
18use common_telemetry::tracing;
19use datafusion_expr::LogicalPlan;
20use query::parser::{
21 PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
22 DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
23};
24use session::context::QueryContextRef;
25use snafu::ResultExt;
26use sql::statements::tql::Tql;
27
28use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Result};
29use crate::statement::StatementExecutor;
30
31impl StatementExecutor {
32 #[tracing::instrument(skip_all)]
34 pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
35 let stmt = match tql {
36 Tql::Eval(eval) => {
37 let promql = PromQuery {
38 start: eval.start,
39 end: eval.end,
40 step: eval.step,
41 query: eval.query,
42 lookback: eval
43 .lookback
44 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
45 };
46 QueryLanguageParser::parse_promql(&promql, query_ctx).context(ParseQuerySnafu)?
47 }
48 Tql::Explain(explain) => {
49 if let Some(format) = &explain.format {
50 query_ctx.set_explain_format(format.to_string());
51 }
52
53 let promql = PromQuery {
54 query: explain.query,
55 lookback: explain
56 .lookback
57 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
58 ..PromQuery::default()
59 };
60 let explain_node_name = if explain.is_verbose {
61 EXPLAIN_VERBOSE_NODE_NAME
62 } else {
63 EXPLAIN_NODE_NAME
64 }
65 .to_string();
66 let params = HashMap::from([("name".to_string(), explain_node_name)]);
67 QueryLanguageParser::parse_promql(&promql, query_ctx)
68 .context(ParseQuerySnafu)?
69 .post_process(params)
70 .unwrap()
71 }
72 Tql::Analyze(analyze) => {
73 if let Some(format) = &analyze.format {
74 query_ctx.set_explain_format(format.to_string());
75 }
76
77 let promql = PromQuery {
78 start: analyze.start,
79 end: analyze.end,
80 step: analyze.step,
81 query: analyze.query,
82 lookback: analyze
83 .lookback
84 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
85 };
86 let analyze_node_name = if analyze.is_verbose {
87 ANALYZE_VERBOSE_NODE_NAME
88 } else {
89 ANALYZE_NODE_NAME
90 }
91 .to_string();
92 let params = HashMap::from([("name".to_string(), analyze_node_name)]);
93 QueryLanguageParser::parse_promql(&promql, query_ctx)
94 .context(ParseQuerySnafu)?
95 .post_process(params)
96 .unwrap()
97 }
98 };
99 self.query_engine
100 .planner()
101 .plan(&stmt, query_ctx.clone())
102 .await
103 .context(PlanStatementSnafu)
104 }
105
106 #[tracing::instrument(skip_all)]
108 pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
109 let plan = self.plan_tql(tql, &query_ctx).await?;
110 self.query_engine
111 .execute(plan, query_ctx)
112 .await
113 .context(ExecLogicalPlanSnafu)
114 }
115}