query/
parser.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::Expr::Extension;
25use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
26use promql_parser::parser::value::ValueType;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35    AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36    QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45
46#[derive(Debug, Clone)]
47pub enum QueryStatement {
48    Sql(Statement),
49    // The optional String is the alias name
50    Promql(EvalStmt, Option<String>),
51}
52
53impl QueryStatement {
54    pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
55        match self {
56            QueryStatement::Sql(_) => UnimplementedSnafu {
57                operation: "sql post process",
58            }
59            .fail(),
60            QueryStatement::Promql(eval_stmt, alias) => {
61                let node_name = match params.get("name") {
62                    Some(name) => name.as_str(),
63                    None => "",
64                };
65                let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
66                Ok(QueryStatement::Promql(
67                    EvalStmt {
68                        expr: Extension(extension_node.unwrap()),
69                        start: eval_stmt.start,
70                        end: eval_stmt.end,
71                        interval: eval_stmt.interval,
72                        lookback_delta: eval_stmt.lookback_delta,
73                    },
74                    alias.clone(),
75                ))
76            }
77        }
78    }
79
80    fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
81        match node_name {
82            ANALYZE_NODE_NAME => Some(NodeExtension {
83                expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
84            }),
85            ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
86                expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
87            }),
88            EXPLAIN_NODE_NAME => Some(NodeExtension {
89                expr: Arc::new(ExplainExpr { expr: expr.clone() }),
90            }),
91            EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
92                expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
93            }),
94            _ => None,
95        }
96    }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct PromQuery {
101    pub query: String,
102    pub start: String,
103    pub end: String,
104    pub step: String,
105    pub lookback: String,
106    pub alias: Option<String>,
107}
108
109impl Default for PromQuery {
110    fn default() -> Self {
111        PromQuery {
112            query: String::new(),
113            start: String::from("0"),
114            end: String::from("0"),
115            step: String::from("5m"),
116            lookback: String::from(DEFAULT_LOOKBACK_STRING),
117            alias: None,
118        }
119    }
120}
121
122/// Query language parser, supports parsing SQL and PromQL
123pub struct QueryLanguageParser {}
124
125impl QueryLanguageParser {
126    /// Try to parse SQL with GreptimeDB dialect, return the statement when success.
127    pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
128        let _timer = PARSE_SQL_ELAPSED.start_timer();
129        let mut statement =
130            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
131                .map_err(BoxedError::new)
132                .context(QueryParseSnafu { query: sql })?;
133        if statement.len() != 1 {
134            MultipleStatementsSnafu {
135                query: sql.to_string(),
136            }
137            .fail()
138        } else {
139            Ok(QueryStatement::Sql(statement.pop().unwrap()))
140        }
141    }
142
143    /// Try to parse PromQL, return the statement when success.
144    #[tracing::instrument(skip_all)]
145    pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
146        let _timer = PARSE_PROMQL_ELAPSED.start_timer();
147
148        let expr = promql_parser::parser::parse(&query.query)
149            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
150            .context(QueryParseSnafu {
151                query: &query.query,
152            })?;
153
154        let start = Self::parse_promql_timestamp(&query.start)
155            .map_err(BoxedError::new)
156            .context(QueryParseSnafu {
157                query: &query.query,
158            })?;
159
160        let end = Self::parse_promql_timestamp(&query.end)
161            .map_err(BoxedError::new)
162            .context(QueryParseSnafu {
163                query: &query.query,
164            })?;
165
166        let step = query
167            .step
168            .parse::<u64>()
169            .map(Duration::from_secs)
170            .or_else(|_| promql_parser::util::parse_duration(&query.step))
171            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
172            .context(QueryParseSnafu {
173                query: &query.query,
174            })?;
175
176        let lookback_delta = query
177            .lookback
178            .parse::<u64>()
179            .map(Duration::from_secs)
180            .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
181            .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
182            .context(QueryParseSnafu {
183                query: &query.query,
184            })?;
185
186        let eval_stmt = EvalStmt {
187            expr,
188            start,
189            end,
190            interval: step,
191            lookback_delta,
192        };
193
194        Ok(QueryStatement::Promql(eval_stmt, query.alias.clone()))
195    }
196
197    pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
198        // try rfc3339 format
199        let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
200            .context(ParseTimestampSnafu { raw: timestamp })
201            .map(Into::<SystemTime>::into);
202
203        // shorthand
204        if rfc3339_result.is_ok() {
205            return rfc3339_result;
206        }
207
208        // try float format
209        let secs = timestamp
210            .parse::<f64>()
211            .context(ParseFloatSnafu { raw: timestamp })
212            // also report rfc3339 error if float parsing fails
213            .map_err(|_| rfc3339_result.unwrap_err())?;
214
215        let duration =
216            Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?;
217        SystemTime::UNIX_EPOCH
218            .checked_add(duration)
219            .context(AddSystemTimeOverflowSnafu { duration })
220    }
221}
222
223macro_rules! define_node_ast_extension {
224    ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
225        /// The implementation of the `$name_expr` extension AST node
226        #[derive(Debug, Clone)]
227        pub struct $name_expr {
228            pub expr: $expr_type,
229        }
230
231        impl ExtensionExpr for $name_expr {
232            fn as_any(&self) -> &dyn Any {
233                self
234            }
235
236            fn name(&self) -> &str {
237                $extension_name
238            }
239
240            fn value_type(&self) -> ValueType {
241                self.expr.value_type()
242            }
243
244            fn children(&self) -> &[Expr] {
245                std::slice::from_ref(&self.expr)
246            }
247        }
248
249        #[allow(rustdoc::broken_intra_doc_links)]
250        #[derive(Debug, Clone)]
251        pub struct $name {
252            pub expr: Arc<$name_expr>,
253        }
254
255        impl $name {
256            pub fn new(expr: $expr_type) -> Self {
257                Self {
258                    expr: Arc::new($name_expr { expr }),
259                }
260            }
261        }
262    };
263}
264
265define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
266define_node_ast_extension!(
267    AnalyzeVerbose,
268    AnalyzeVerboseExpr,
269    Expr,
270    ANALYZE_VERBOSE_NODE_NAME
271);
272define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
273define_node_ast_extension!(
274    ExplainVerbose,
275    ExplainVerboseExpr,
276    Expr,
277    EXPLAIN_VERBOSE_NODE_NAME
278);
279
280#[cfg(test)]
281mod test {
282    use session::context::QueryContext;
283
284    use super::*;
285
286    // Detailed logic tests are covered in the parser crate.
287    #[test]
288    fn parse_sql_simple() {
289        let sql = "select * from t1";
290        let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
291        let QueryStatement::Sql(sql_stmt) = stmt else {
292            panic!("Expected SQL statement, got {:?}", stmt);
293        };
294        assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
295    }
296
297    #[test]
298    fn parse_promql_timestamp() {
299        let cases = vec![
300            (
301                "1435781451.781",
302                SystemTime::UNIX_EPOCH
303                    .checked_add(Duration::from_secs_f64(1435781451.781))
304                    .unwrap(),
305            ),
306            ("0.000", SystemTime::UNIX_EPOCH),
307            ("00", SystemTime::UNIX_EPOCH),
308            (
309                "2015-07-01T20:10:51.781Z",
310                SystemTime::UNIX_EPOCH
311                    .checked_add(Duration::from_secs_f64(1435781451.781))
312                    .unwrap(),
313            ),
314            ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
315        ];
316
317        for (input, expected) in cases {
318            let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
319
320            let result = result
321                .duration_since(SystemTime::UNIX_EPOCH)
322                .unwrap()
323                .as_millis();
324            let expected = expected
325                .duration_since(SystemTime::UNIX_EPOCH)
326                .unwrap()
327                .as_millis();
328
329            // assert difference < 0.1 second
330            assert!(result.abs_diff(expected) < 100);
331        }
332
333        // i64::MAX + 1
334        let timestamp = "9223372036854775808.000";
335        let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
336        assert_eq!(
337            result.unwrap_err().to_string(),
338            "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
339        );
340    }
341
342    #[test]
343    fn parse_promql_simple() {
344        let promql = PromQuery {
345            query: "http_request".to_string(),
346            start: "2022-02-13T17:14:00Z".to_string(),
347            end: "2023-02-13T17:14:00Z".to_string(),
348            step: "1d".to_string(),
349            lookback: "5m".to_string(),
350            alias: Some("my_query".to_string()),
351        };
352
353        #[cfg(not(windows))]
354        let expected = String::from(
355            "\
356            Promql(EvalStmt { \
357                expr: VectorSelector(VectorSelector { \
358                    name: Some(\"http_request\"), \
359                    matchers: Matchers { matchers: [], or_matchers: [] }, \
360                    offset: None, at: None }), \
361                start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
362                end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
363                interval: 86400s, \
364                lookback_delta: 300s \
365            }, Some(\"my_query\"))",
366        );
367
368        // Windows has different debug output for SystemTime.
369        #[cfg(windows)]
370        let expected = String::from(
371            "\
372            Promql(EvalStmt { \
373                expr: VectorSelector(VectorSelector { \
374                    name: Some(\"http_request\"), \
375                    matchers: Matchers { matchers: [], or_matchers: [] }, \
376                    offset: None, at: None }), \
377                start: SystemTime { intervals: 132892460400000000 }, \
378                end: SystemTime { intervals: 133207820400000000 }, \
379                interval: 86400s, \
380                lookback_delta: 300s \
381            }, Some(\"my_query\"))",
382        );
383
384        let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
385        assert_eq!(format!("{result:?}"), expected);
386    }
387}