query/
parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::Expr::Extension;
25use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
26use promql_parser::parser::value::ValueType;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35    AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36    QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45
46#[derive(Debug, Clone)]
47pub enum QueryStatement {
48    Sql(Statement),
49    Promql(EvalStmt),
50}
51
52impl QueryStatement {
53    pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
54        match self {
55            QueryStatement::Sql(_) => UnimplementedSnafu {
56                operation: "sql post process",
57            }
58            .fail(),
59            QueryStatement::Promql(eval_stmt) => {
60                let node_name = match params.get("name") {
61                    Some(name) => name.as_str(),
62                    None => "",
63                };
64                let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
65                Ok(QueryStatement::Promql(EvalStmt {
66                    expr: Extension(extension_node.unwrap()),
67                    start: eval_stmt.start,
68                    end: eval_stmt.end,
69                    interval: eval_stmt.interval,
70                    lookback_delta: eval_stmt.lookback_delta,
71                }))
72            }
73        }
74    }
75
76    fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
77        match node_name {
78            ANALYZE_NODE_NAME => Some(NodeExtension {
79                expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
80            }),
81            ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
82                expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
83            }),
84            EXPLAIN_NODE_NAME => Some(NodeExtension {
85                expr: Arc::new(ExplainExpr { expr: expr.clone() }),
86            }),
87            EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
88                expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
89            }),
90            _ => None,
91        }
92    }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct PromQuery {
97    pub query: String,
98    pub start: String,
99    pub end: String,
100    pub step: String,
101    pub lookback: String,
102}
103
104impl Default for PromQuery {
105    fn default() -> Self {
106        PromQuery {
107            query: String::new(),
108            start: String::from("0"),
109            end: String::from("0"),
110            step: String::from("5m"),
111            lookback: String::from(DEFAULT_LOOKBACK_STRING),
112        }
113    }
114}
115
116/// Query language parser, supports parsing SQL and PromQL
117pub struct QueryLanguageParser {}
118
119impl QueryLanguageParser {
120    /// Try to parse SQL with GreptimeDB dialect, return the statement when success.
121    pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
122        let _timer = PARSE_SQL_ELAPSED.start_timer();
123        let mut statement =
124            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
125                .map_err(BoxedError::new)
126                .context(QueryParseSnafu { query: sql })?;
127        if statement.len() != 1 {
128            MultipleStatementsSnafu {
129                query: sql.to_string(),
130            }
131            .fail()
132        } else {
133            Ok(QueryStatement::Sql(statement.pop().unwrap()))
134        }
135    }
136
137    /// Try to parse PromQL, return the statement when success.
138    #[tracing::instrument(skip_all)]
139    pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
140        let _timer = PARSE_PROMQL_ELAPSED.start_timer();
141
142        let expr = promql_parser::parser::parse(&query.query)
143            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
144            .context(QueryParseSnafu {
145                query: &query.query,
146            })?;
147
148        let start = Self::parse_promql_timestamp(&query.start)
149            .map_err(BoxedError::new)
150            .context(QueryParseSnafu {
151                query: &query.query,
152            })?;
153
154        let end = Self::parse_promql_timestamp(&query.end)
155            .map_err(BoxedError::new)
156            .context(QueryParseSnafu {
157                query: &query.query,
158            })?;
159
160        let step = query
161            .step
162            .parse::<u64>()
163            .map(Duration::from_secs)
164            .or_else(|_| promql_parser::util::parse_duration(&query.step))
165            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
166            .context(QueryParseSnafu {
167                query: &query.query,
168            })?;
169
170        let lookback_delta = query
171            .lookback
172            .parse::<u64>()
173            .map(Duration::from_secs)
174            .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
175            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
176            .context(QueryParseSnafu {
177                query: &query.query,
178            })?;
179
180        let eval_stmt = EvalStmt {
181            expr,
182            start,
183            end,
184            interval: step,
185            lookback_delta,
186        };
187
188        Ok(QueryStatement::Promql(eval_stmt))
189    }
190
191    pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
192        // try rfc3339 format
193        let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
194            .context(ParseTimestampSnafu { raw: timestamp })
195            .map(Into::<SystemTime>::into);
196
197        // shorthand
198        if rfc3339_result.is_ok() {
199            return rfc3339_result;
200        }
201
202        // try float format
203        let secs = timestamp
204            .parse::<f64>()
205            .context(ParseFloatSnafu { raw: timestamp })
206            // also report rfc3339 error if float parsing fails
207            .map_err(|_| rfc3339_result.unwrap_err())?;
208
209        let duration =
210            Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?;
211        SystemTime::UNIX_EPOCH
212            .checked_add(duration)
213            .context(AddSystemTimeOverflowSnafu { duration })
214    }
215}
216
217macro_rules! define_node_ast_extension {
218    ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
219        /// The implementation of the `$name_expr` extension AST node
220        #[derive(Debug, Clone)]
221        pub struct $name_expr {
222            pub expr: $expr_type,
223        }
224
225        impl ExtensionExpr for $name_expr {
226            fn as_any(&self) -> &dyn Any {
227                self
228            }
229
230            fn name(&self) -> &str {
231                $extension_name
232            }
233
234            fn value_type(&self) -> ValueType {
235                self.expr.value_type()
236            }
237
238            fn children(&self) -> &[Expr] {
239                std::slice::from_ref(&self.expr)
240            }
241        }
242
243        #[allow(rustdoc::broken_intra_doc_links)]
244        #[derive(Debug, Clone)]
245        pub struct $name {
246            pub expr: Arc<$name_expr>,
247        }
248
249        impl $name {
250            pub fn new(expr: $expr_type) -> Self {
251                Self {
252                    expr: Arc::new($name_expr { expr }),
253                }
254            }
255        }
256    };
257}
258
259define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
260define_node_ast_extension!(
261    AnalyzeVerbose,
262    AnalyzeVerboseExpr,
263    Expr,
264    ANALYZE_VERBOSE_NODE_NAME
265);
266define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
267define_node_ast_extension!(
268    ExplainVerbose,
269    ExplainVerboseExpr,
270    Expr,
271    EXPLAIN_VERBOSE_NODE_NAME
272);
273
274#[cfg(test)]
275mod test {
276    use session::context::QueryContext;
277
278    use super::*;
279
280    // Detailed logic tests are covered in the parser crate.
281    #[test]
282    fn parse_sql_simple() {
283        let sql = "select * from t1";
284        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
285        let QueryStatement::Sql(sql_stmt) = stmt else {
286            panic!("Expected SQL statement, got {:?}", stmt);
287        };
288        assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
289    }
290
291    #[test]
292    fn parse_promql_timestamp() {
293        let cases = vec![
294            (
295                "1435781451.781",
296                SystemTime::UNIX_EPOCH
297                    .checked_add(Duration::from_secs_f64(1435781451.781))
298                    .unwrap(),
299            ),
300            ("0.000", SystemTime::UNIX_EPOCH),
301            ("00", SystemTime::UNIX_EPOCH),
302            (
303                "2015-07-01T20:10:51.781Z",
304                SystemTime::UNIX_EPOCH
305                    .checked_add(Duration::from_secs_f64(1435781451.781))
306                    .unwrap(),
307            ),
308            ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
309        ];
310
311        for (input, expected) in cases {
312            let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
313
314            let result = result
315                .duration_since(SystemTime::UNIX_EPOCH)
316                .unwrap()
317                .as_millis();
318            let expected = expected
319                .duration_since(SystemTime::UNIX_EPOCH)
320                .unwrap()
321                .as_millis();
322
323            // assert difference < 0.1 second
324            assert!(result.abs_diff(expected) < 100);
325        }
326
327        // i64::MAX + 1
328        let timestamp = "9223372036854775808.000";
329        let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
330        assert_eq!(
331            result.unwrap_err().to_string(),
332            "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
333        );
334    }
335
336    #[test]
337    fn parse_promql_simple() {
338        let promql = PromQuery {
339            query: "http_request".to_string(),
340            start: "2022-02-13T17:14:00Z".to_string(),
341            end: "2023-02-13T17:14:00Z".to_string(),
342            step: "1d".to_string(),
343            lookback: "5m".to_string(),
344        };
345
346        #[cfg(not(windows))]
347        let expected = String::from(
348            "\
349            Promql(EvalStmt { \
350                expr: VectorSelector(VectorSelector { \
351                    name: Some(\"http_request\"), \
352                    matchers: Matchers { matchers: [], or_matchers: [] }, \
353                    offset: None, at: None }), \
354                start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
355                end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
356                interval: 86400s, \
357                lookback_delta: 300s \
358            })",
359        );
360
361        // Windows has different debug output for SystemTime.
362        #[cfg(windows)]
363        let expected = String::from(
364            "\
365            Promql(EvalStmt { \
366                expr: VectorSelector(VectorSelector { \
367                    name: Some(\"http_request\"), \
368                    matchers: Matchers { matchers: [], or_matchers: [] }, \
369                    offset: None, at: None }), \
370                start: SystemTime { intervals: 132892460400000000 }, \
371                end: SystemTime { intervals: 133207820400000000 }, \
372                interval: 86400s, \
373                lookback_delta: 300s \
374            })",
375        );
376
377        let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
378        assert_eq!(format!("{result:?}"), expected);
379    }
380}