query/
parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
25use promql_parser::parser::value::ValueType;
26use promql_parser::parser::Expr::Extension;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35    AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36    QueryParseSnafu, Result, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45
46#[derive(Debug, Clone)]
47pub enum QueryStatement {
48    Sql(Statement),
49    Promql(EvalStmt),
50}
51
52impl QueryStatement {
53    pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
54        match self {
55            QueryStatement::Sql(_) => UnimplementedSnafu {
56                operation: "sql post process",
57            }
58            .fail(),
59            QueryStatement::Promql(eval_stmt) => {
60                let node_name = match params.get("name") {
61                    Some(name) => name.as_str(),
62                    None => "",
63                };
64                let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
65                Ok(QueryStatement::Promql(EvalStmt {
66                    expr: Extension(extension_node.unwrap()),
67                    start: eval_stmt.start,
68                    end: eval_stmt.end,
69                    interval: eval_stmt.interval,
70                    lookback_delta: eval_stmt.lookback_delta,
71                }))
72            }
73        }
74    }
75
76    fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
77        match node_name {
78            ANALYZE_NODE_NAME => Some(NodeExtension {
79                expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
80            }),
81            ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
82                expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
83            }),
84            EXPLAIN_NODE_NAME => Some(NodeExtension {
85                expr: Arc::new(ExplainExpr { expr: expr.clone() }),
86            }),
87            EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
88                expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
89            }),
90            _ => None,
91        }
92    }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct PromQuery {
97    pub query: String,
98    pub start: String,
99    pub end: String,
100    pub step: String,
101    pub lookback: String,
102}
103
104impl Default for PromQuery {
105    fn default() -> Self {
106        PromQuery {
107            query: String::new(),
108            start: String::from("0"),
109            end: String::from("0"),
110            step: String::from("5m"),
111            lookback: String::from(DEFAULT_LOOKBACK_STRING),
112        }
113    }
114}
115
116/// Query language parser, supports parsing SQL and PromQL
117pub struct QueryLanguageParser {}
118
119impl QueryLanguageParser {
120    /// Try to parse SQL with GreptimeDB dialect, return the statement when success.
121    pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
122        let _timer = PARSE_SQL_ELAPSED.start_timer();
123        let mut statement =
124            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
125                .map_err(BoxedError::new)
126                .context(QueryParseSnafu { query: sql })?;
127        if statement.len() != 1 {
128            MultipleStatementsSnafu {
129                query: sql.to_string(),
130            }
131            .fail()
132        } else {
133            Ok(QueryStatement::Sql(statement.pop().unwrap()))
134        }
135    }
136
137    /// Try to parse PromQL, return the statement when success.
138    #[tracing::instrument(skip_all)]
139    pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
140        let _timer = PARSE_PROMQL_ELAPSED.start_timer();
141
142        let expr = promql_parser::parser::parse(&query.query)
143            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
144            .context(QueryParseSnafu {
145                query: &query.query,
146            })?;
147
148        let start = Self::parse_promql_timestamp(&query.start)
149            .map_err(BoxedError::new)
150            .context(QueryParseSnafu {
151                query: &query.query,
152            })?;
153
154        let end = Self::parse_promql_timestamp(&query.end)
155            .map_err(BoxedError::new)
156            .context(QueryParseSnafu {
157                query: &query.query,
158            })?;
159
160        let step = query
161            .step
162            .parse::<u64>()
163            .map(Duration::from_secs)
164            .or_else(|_| promql_parser::util::parse_duration(&query.step))
165            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
166            .context(QueryParseSnafu {
167                query: &query.query,
168            })?;
169
170        let lookback_delta = query
171            .lookback
172            .parse::<u64>()
173            .map(Duration::from_secs)
174            .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
175            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
176            .context(QueryParseSnafu {
177                query: &query.query,
178            })?;
179
180        let eval_stmt = EvalStmt {
181            expr,
182            start,
183            end,
184            interval: step,
185            lookback_delta,
186        };
187
188        Ok(QueryStatement::Promql(eval_stmt))
189    }
190
191    pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
192        // try rfc3339 format
193        let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
194            .context(ParseTimestampSnafu { raw: timestamp })
195            .map(Into::<SystemTime>::into);
196
197        // shorthand
198        if rfc3339_result.is_ok() {
199            return rfc3339_result;
200        }
201
202        // try float format
203        let secs = timestamp
204            .parse::<f64>()
205            .context(ParseFloatSnafu { raw: timestamp })
206            // also report rfc3339 error if float parsing fails
207            .map_err(|_| rfc3339_result.unwrap_err())?;
208
209        let duration = Duration::from_secs_f64(secs);
210        SystemTime::UNIX_EPOCH
211            .checked_add(duration)
212            .context(AddSystemTimeOverflowSnafu { duration })
213    }
214}
215
216macro_rules! define_node_ast_extension {
217    ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
218        /// The implementation of the `$name_expr` extension AST node
219        #[derive(Debug, Clone)]
220        pub struct $name_expr {
221            pub expr: $expr_type,
222        }
223
224        impl ExtensionExpr for $name_expr {
225            fn as_any(&self) -> &dyn Any {
226                self
227            }
228
229            fn name(&self) -> &str {
230                $extension_name
231            }
232
233            fn value_type(&self) -> ValueType {
234                self.expr.value_type()
235            }
236
237            fn children(&self) -> &[Expr] {
238                std::slice::from_ref(&self.expr)
239            }
240        }
241
242        #[allow(rustdoc::broken_intra_doc_links)]
243        #[derive(Debug, Clone)]
244        pub struct $name {
245            pub expr: Arc<$name_expr>,
246        }
247
248        impl $name {
249            pub fn new(expr: $expr_type) -> Self {
250                Self {
251                    expr: Arc::new($name_expr { expr }),
252                }
253            }
254        }
255    };
256}
257
258define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
259define_node_ast_extension!(
260    AnalyzeVerbose,
261    AnalyzeVerboseExpr,
262    Expr,
263    ANALYZE_VERBOSE_NODE_NAME
264);
265define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
266define_node_ast_extension!(
267    ExplainVerbose,
268    ExplainVerboseExpr,
269    Expr,
270    EXPLAIN_VERBOSE_NODE_NAME
271);
272
273#[cfg(test)]
274mod test {
275    use session::context::QueryContext;
276
277    use super::*;
278
279    // Detailed logic tests are covered in the parser crate.
280    #[test]
281    fn parse_sql_simple() {
282        let sql = "select * from t1";
283        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
284        let QueryStatement::Sql(sql_stmt) = stmt else {
285            panic!("Expected SQL statement, got {:?}", stmt);
286        };
287        assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
288    }
289
290    #[test]
291    fn parse_promql_timestamp() {
292        let cases = vec![
293            (
294                "1435781451.781",
295                SystemTime::UNIX_EPOCH
296                    .checked_add(Duration::from_secs_f64(1435781451.781))
297                    .unwrap(),
298            ),
299            ("0.000", SystemTime::UNIX_EPOCH),
300            ("00", SystemTime::UNIX_EPOCH),
301            (
302                "2015-07-01T20:10:51.781Z",
303                SystemTime::UNIX_EPOCH
304                    .checked_add(Duration::from_secs_f64(1435781451.781))
305                    .unwrap(),
306            ),
307            ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
308        ];
309
310        for (input, expected) in cases {
311            let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
312
313            let result = result
314                .duration_since(SystemTime::UNIX_EPOCH)
315                .unwrap()
316                .as_millis();
317            let expected = expected
318                .duration_since(SystemTime::UNIX_EPOCH)
319                .unwrap()
320                .as_millis();
321
322            // assert difference < 0.1 second
323            assert!(result.abs_diff(expected) < 100);
324        }
325
326        // i64::MAX + 1
327        let timestamp = "9223372036854775808.000";
328        let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
329        assert_eq!(
330            result.unwrap_err().to_string(),
331            "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
332        );
333    }
334
335    #[test]
336    fn parse_promql_simple() {
337        let promql = PromQuery {
338            query: "http_request".to_string(),
339            start: "2022-02-13T17:14:00Z".to_string(),
340            end: "2023-02-13T17:14:00Z".to_string(),
341            step: "1d".to_string(),
342            lookback: "5m".to_string(),
343        };
344
345        #[cfg(not(windows))]
346        let expected = String::from(
347            "\
348            Promql(EvalStmt { \
349                expr: VectorSelector(VectorSelector { \
350                    name: Some(\"http_request\"), \
351                    matchers: Matchers { matchers: [], or_matchers: [] }, \
352                    offset: None, at: None }), \
353                start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
354                end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
355                interval: 86400s, \
356                lookback_delta: 300s \
357            })",
358        );
359
360        // Windows has different debug output for SystemTime.
361        #[cfg(windows)]
362        let expected = String::from(
363            "\
364            Promql(EvalStmt { \
365                expr: VectorSelector(VectorSelector { \
366                    name: Some(\"http_request\"), \
367                    matchers: Matchers { matchers: [], or_matchers: [] }, \
368                    offset: None, at: None }), \
369                start: SystemTime { intervals: 132892460400000000 }, \
370                end: SystemTime { intervals: 133207820400000000 }, \
371                interval: 86400s, \
372                lookback_delta: 300s \
373            })",
374        );
375
376        let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
377        assert_eq!(format!("{result:?}"), expected);
378    }
379}