query/
parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::Expr::Extension;
25use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
26use promql_parser::parser::value::ValueType;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35    AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36    QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45pub const ALIAS_NODE_NAME: &str = "ALIAS";
46
47#[derive(Debug, Clone)]
48pub enum QueryStatement {
49    Sql(Statement),
50    // The optional String is the alias name
51    Promql(EvalStmt, Option<String>),
52}
53
54impl QueryStatement {
55    pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
56        match self {
57            QueryStatement::Sql(_) => UnimplementedSnafu {
58                operation: "sql post process",
59            }
60            .fail(),
61            QueryStatement::Promql(eval_stmt, alias) => {
62                let node_name = match params.get("name") {
63                    Some(name) => name.as_str(),
64                    None => "",
65                };
66                let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
67                Ok(QueryStatement::Promql(
68                    EvalStmt {
69                        expr: Extension(extension_node.unwrap()),
70                        start: eval_stmt.start,
71                        end: eval_stmt.end,
72                        interval: eval_stmt.interval,
73                        lookback_delta: eval_stmt.lookback_delta,
74                    },
75                    alias.clone(),
76                ))
77            }
78        }
79    }
80
81    fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
82        match node_name {
83            ANALYZE_NODE_NAME => Some(NodeExtension {
84                expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
85            }),
86            ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
87                expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
88            }),
89            EXPLAIN_NODE_NAME => Some(NodeExtension {
90                expr: Arc::new(ExplainExpr { expr: expr.clone() }),
91            }),
92            EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
93                expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
94            }),
95            _ => None,
96        }
97    }
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct PromQuery {
102    pub query: String,
103    pub start: String,
104    pub end: String,
105    pub step: String,
106    pub lookback: String,
107    pub alias: Option<String>,
108}
109
110impl Default for PromQuery {
111    fn default() -> Self {
112        PromQuery {
113            query: String::new(),
114            start: String::from("0"),
115            end: String::from("0"),
116            step: String::from("5m"),
117            lookback: String::from(DEFAULT_LOOKBACK_STRING),
118            alias: None,
119        }
120    }
121}
122
123/// Query language parser, supports parsing SQL and PromQL
124pub struct QueryLanguageParser {}
125
126impl QueryLanguageParser {
127    /// Try to parse SQL with GreptimeDB dialect, return the statement when success.
128    pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
129        let _timer = PARSE_SQL_ELAPSED.start_timer();
130        let mut statement =
131            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
132                .map_err(BoxedError::new)
133                .context(QueryParseSnafu { query: sql })?;
134        if statement.len() != 1 {
135            MultipleStatementsSnafu {
136                query: sql.to_string(),
137            }
138            .fail()
139        } else {
140            Ok(QueryStatement::Sql(statement.pop().unwrap()))
141        }
142    }
143
144    /// Try to parse PromQL, return the statement when success.
145    #[tracing::instrument(skip_all)]
146    pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
147        let _timer = PARSE_PROMQL_ELAPSED.start_timer();
148
149        let expr = promql_parser::parser::parse(&query.query)
150            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
151            .context(QueryParseSnafu {
152                query: &query.query,
153            })?;
154
155        let start = Self::parse_promql_timestamp(&query.start)
156            .map_err(BoxedError::new)
157            .context(QueryParseSnafu {
158                query: &query.query,
159            })?;
160
161        let end = Self::parse_promql_timestamp(&query.end)
162            .map_err(BoxedError::new)
163            .context(QueryParseSnafu {
164                query: &query.query,
165            })?;
166
167        let step = query
168            .step
169            .parse::<u64>()
170            .map(Duration::from_secs)
171            .or_else(|_| promql_parser::util::parse_duration(&query.step))
172            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
173            .context(QueryParseSnafu {
174                query: &query.query,
175            })?;
176
177        let lookback_delta = query
178            .lookback
179            .parse::<u64>()
180            .map(Duration::from_secs)
181            .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
182            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
183            .context(QueryParseSnafu {
184                query: &query.query,
185            })?;
186
187        let eval_stmt = EvalStmt {
188            expr,
189            start,
190            end,
191            interval: step,
192            lookback_delta,
193        };
194        if let Some(alias) = &query.alias {
195            let eval_stmt = Self::apply_alias_extension(eval_stmt, alias);
196            return Ok(QueryStatement::Promql(eval_stmt, query.alias.clone()));
197        }
198        Ok(QueryStatement::Promql(eval_stmt, None))
199    }
200
201    pub(crate) fn apply_alias_extension(mut eval_stmt: EvalStmt, alias: &str) -> EvalStmt {
202        eval_stmt.expr = Extension(NodeExtension {
203            expr: Arc::new(AliasExpr {
204                expr: eval_stmt.expr.clone(),
205                alias: alias.to_string(),
206            }),
207        });
208        eval_stmt
209    }
210
211    pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
212        // try rfc3339 format
213        let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
214            .context(ParseTimestampSnafu { raw: timestamp })
215            .map(Into::<SystemTime>::into);
216
217        // shorthand
218        if rfc3339_result.is_ok() {
219            return rfc3339_result;
220        }
221
222        // try float format
223        let secs = timestamp
224            .parse::<f64>()
225            .context(ParseFloatSnafu { raw: timestamp })
226            // also report rfc3339 error if float parsing fails
227            .map_err(|_| rfc3339_result.unwrap_err())?;
228
229        let duration =
230            Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?;
231        SystemTime::UNIX_EPOCH
232            .checked_add(duration)
233            .context(AddSystemTimeOverflowSnafu { duration })
234    }
235}
236
237macro_rules! define_node_ast_extension {
238    ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
239        /// The implementation of the `$name_expr` extension AST node
240        #[derive(Debug, Clone)]
241        pub struct $name_expr {
242            pub expr: $expr_type,
243        }
244
245        impl ExtensionExpr for $name_expr {
246            fn as_any(&self) -> &dyn Any {
247                self
248            }
249
250            fn name(&self) -> &str {
251                $extension_name
252            }
253
254            fn value_type(&self) -> ValueType {
255                self.expr.value_type()
256            }
257
258            fn children(&self) -> &[Expr] {
259                std::slice::from_ref(&self.expr)
260            }
261        }
262
263        #[allow(rustdoc::broken_intra_doc_links)]
264        #[derive(Debug, Clone)]
265        pub struct $name {
266            pub expr: Arc<$name_expr>,
267        }
268
269        impl $name {
270            pub fn new(expr: $expr_type) -> Self {
271                Self {
272                    expr: Arc::new($name_expr { expr }),
273                }
274            }
275        }
276    };
277}
278
279define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
280define_node_ast_extension!(
281    AnalyzeVerbose,
282    AnalyzeVerboseExpr,
283    Expr,
284    ANALYZE_VERBOSE_NODE_NAME
285);
286define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
287define_node_ast_extension!(
288    ExplainVerbose,
289    ExplainVerboseExpr,
290    Expr,
291    EXPLAIN_VERBOSE_NODE_NAME
292);
293#[derive(Debug, Clone)]
294pub struct AliasExpr {
295    pub expr: Expr,
296    pub alias: String,
297}
298impl ExtensionExpr for AliasExpr {
299    fn as_any(&self) -> &dyn Any {
300        self
301    }
302    fn name(&self) -> &str {
303        ALIAS_NODE_NAME
304    }
305    fn value_type(&self) -> ValueType {
306        self.expr.value_type()
307    }
308    fn children(&self) -> &[Expr] {
309        std::slice::from_ref(&self.expr)
310    }
311}
312#[derive(Debug, Clone)]
313pub struct Alias {
314    pub expr: Arc<AliasExpr>,
315}
316impl Alias {
317    pub fn new(expr: Expr, alias: String) -> Self {
318        Self {
319            expr: Arc::new(AliasExpr { expr, alias }),
320        }
321    }
322}
323
324#[cfg(test)]
325mod test {
326    use session::context::QueryContext;
327
328    use super::*;
329
330    // Detailed logic tests are covered in the parser crate.
331    #[test]
332    fn parse_sql_simple() {
333        let sql = "select * from t1";
334        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
335        let QueryStatement::Sql(sql_stmt) = stmt else {
336            panic!("Expected SQL statement, got {:?}", stmt);
337        };
338        assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
339    }
340
341    #[test]
342    fn parse_promql_timestamp() {
343        let cases = vec![
344            (
345                "1435781451.781",
346                SystemTime::UNIX_EPOCH
347                    .checked_add(Duration::from_secs_f64(1435781451.781))
348                    .unwrap(),
349            ),
350            ("0.000", SystemTime::UNIX_EPOCH),
351            ("00", SystemTime::UNIX_EPOCH),
352            (
353                "2015-07-01T20:10:51.781Z",
354                SystemTime::UNIX_EPOCH
355                    .checked_add(Duration::from_secs_f64(1435781451.781))
356                    .unwrap(),
357            ),
358            ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
359        ];
360
361        for (input, expected) in cases {
362            let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
363
364            let result = result
365                .duration_since(SystemTime::UNIX_EPOCH)
366                .unwrap()
367                .as_millis();
368            let expected = expected
369                .duration_since(SystemTime::UNIX_EPOCH)
370                .unwrap()
371                .as_millis();
372
373            // assert difference < 0.1 second
374            assert!(result.abs_diff(expected) < 100);
375        }
376
377        // i64::MAX + 1
378        let timestamp = "9223372036854775808.000";
379        let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
380        assert_eq!(
381            result.unwrap_err().to_string(),
382            "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
383        );
384    }
385
386    #[test]
387    fn parse_promql_simple() {
388        let promql = PromQuery {
389            query: "http_request".to_string(),
390            start: "2022-02-13T17:14:00Z".to_string(),
391            end: "2023-02-13T17:14:00Z".to_string(),
392            step: "1d".to_string(),
393            lookback: "5m".to_string(),
394            alias: Some("my_query".to_string()),
395        };
396
397        #[cfg(not(windows))]
398        let expected = String::from(
399            "\
400            Promql(EvalStmt { \
401                expr: Extension(Extension { \
402                    expr: AliasExpr { \
403                        expr: VectorSelector(VectorSelector { \
404                            name: Some(\"http_request\"), \
405                            matchers: Matchers { matchers: [], or_matchers: [] }, \
406                            offset: None, at: None \
407                        }), \
408                        alias: \"my_query\" \
409                    } \
410                }), \
411                start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
412                end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
413                interval: 86400s, \
414                lookback_delta: 300s \
415            }, Some(\"my_query\"))",
416        );
417
418        // Windows has different debug output for SystemTime.
419        #[cfg(windows)]
420        let expected = String::from(
421            "\
422            Promql(EvalStmt { \
423                expr: Extension(Extension { \
424                    expr: AliasExpr { \
425                        expr: VectorSelector(VectorSelector { \
426                            name: Some(\"http_request\"), \
427                            matchers: Matchers { matchers: [], or_matchers: [] }, \
428                            offset: None, at: None \
429                        }), \
430                        alias: \"my_query\" \
431                    } \
432                }), \
433                start: SystemTime { intervals: 132892460400000000 }, \
434                end: SystemTime { intervals: 133207820400000000 }, \
435                interval: 86400s, \
436                lookback_delta: 300s \
437            }, Some(\"my_query\"))",
438        );
439
440        let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
441        assert_eq!(format!("{result:?}"), expected);
442    }
443}