1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::Expr::Extension;
25use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
26use promql_parser::parser::value::ValueType;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35 AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36 QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45pub const ALIAS_NODE_NAME: &str = "ALIAS";
46
47#[derive(Debug, Clone)]
48pub enum QueryStatement {
49 Sql(Statement),
50 Promql(EvalStmt, Option<String>),
52}
53
54impl QueryStatement {
55 pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
56 match self {
57 QueryStatement::Sql(_) => UnimplementedSnafu {
58 operation: "sql post process",
59 }
60 .fail(),
61 QueryStatement::Promql(eval_stmt, alias) => {
62 let node_name = match params.get("name") {
63 Some(name) => name.as_str(),
64 None => "",
65 };
66 let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
67 Ok(QueryStatement::Promql(
68 EvalStmt {
69 expr: Extension(extension_node.unwrap()),
70 start: eval_stmt.start,
71 end: eval_stmt.end,
72 interval: eval_stmt.interval,
73 lookback_delta: eval_stmt.lookback_delta,
74 },
75 alias.clone(),
76 ))
77 }
78 }
79 }
80
81 fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
82 match node_name {
83 ANALYZE_NODE_NAME => Some(NodeExtension {
84 expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
85 }),
86 ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
87 expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
88 }),
89 EXPLAIN_NODE_NAME => Some(NodeExtension {
90 expr: Arc::new(ExplainExpr { expr: expr.clone() }),
91 }),
92 EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
93 expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
94 }),
95 _ => None,
96 }
97 }
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct PromQuery {
102 pub query: String,
103 pub start: String,
104 pub end: String,
105 pub step: String,
106 pub lookback: String,
107 pub alias: Option<String>,
108}
109
110impl Default for PromQuery {
111 fn default() -> Self {
112 PromQuery {
113 query: String::new(),
114 start: String::from("0"),
115 end: String::from("0"),
116 step: String::from("5m"),
117 lookback: String::from(DEFAULT_LOOKBACK_STRING),
118 alias: None,
119 }
120 }
121}
122
123pub struct QueryLanguageParser {}
125
126impl QueryLanguageParser {
127 pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
129 let _timer = PARSE_SQL_ELAPSED.start_timer();
130 let mut statement =
131 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
132 .map_err(BoxedError::new)
133 .context(QueryParseSnafu { query: sql })?;
134 if statement.len() != 1 {
135 MultipleStatementsSnafu {
136 query: sql.to_string(),
137 }
138 .fail()
139 } else {
140 Ok(QueryStatement::Sql(statement.pop().unwrap()))
141 }
142 }
143
144 #[tracing::instrument(skip_all)]
146 pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
147 let _timer = PARSE_PROMQL_ELAPSED.start_timer();
148
149 let expr = promql_parser::parser::parse(&query.query)
150 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
151 .context(QueryParseSnafu {
152 query: &query.query,
153 })?;
154
155 let start = Self::parse_promql_timestamp(&query.start)
156 .map_err(BoxedError::new)
157 .context(QueryParseSnafu {
158 query: &query.query,
159 })?;
160
161 let end = Self::parse_promql_timestamp(&query.end)
162 .map_err(BoxedError::new)
163 .context(QueryParseSnafu {
164 query: &query.query,
165 })?;
166
167 let step = query
168 .step
169 .parse::<u64>()
170 .map(Duration::from_secs)
171 .or_else(|_| promql_parser::util::parse_duration(&query.step))
172 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
173 .context(QueryParseSnafu {
174 query: &query.query,
175 })?;
176
177 let lookback_delta = query
178 .lookback
179 .parse::<u64>()
180 .map(Duration::from_secs)
181 .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
182 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
183 .context(QueryParseSnafu {
184 query: &query.query,
185 })?;
186
187 let eval_stmt = EvalStmt {
188 expr,
189 start,
190 end,
191 interval: step,
192 lookback_delta,
193 };
194 if let Some(alias) = &query.alias {
195 let eval_stmt = Self::apply_alias_extension(eval_stmt, alias);
196 return Ok(QueryStatement::Promql(eval_stmt, query.alias.clone()));
197 }
198 Ok(QueryStatement::Promql(eval_stmt, None))
199 }
200
201 pub(crate) fn apply_alias_extension(mut eval_stmt: EvalStmt, alias: &str) -> EvalStmt {
202 eval_stmt.expr = Extension(NodeExtension {
203 expr: Arc::new(AliasExpr {
204 expr: eval_stmt.expr.clone(),
205 alias: alias.to_string(),
206 }),
207 });
208 eval_stmt
209 }
210
211 pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
212 let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
214 .context(ParseTimestampSnafu { raw: timestamp })
215 .map(Into::<SystemTime>::into);
216
217 if rfc3339_result.is_ok() {
219 return rfc3339_result;
220 }
221
222 let secs = timestamp
224 .parse::<f64>()
225 .context(ParseFloatSnafu { raw: timestamp })
226 .map_err(|_| rfc3339_result.unwrap_err())?;
228
229 let duration =
230 Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?;
231 SystemTime::UNIX_EPOCH
232 .checked_add(duration)
233 .context(AddSystemTimeOverflowSnafu { duration })
234 }
235}
236
237macro_rules! define_node_ast_extension {
238 ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
239 #[derive(Debug, Clone)]
241 pub struct $name_expr {
242 pub expr: $expr_type,
243 }
244
245 impl ExtensionExpr for $name_expr {
246 fn as_any(&self) -> &dyn Any {
247 self
248 }
249
250 fn name(&self) -> &str {
251 $extension_name
252 }
253
254 fn value_type(&self) -> ValueType {
255 self.expr.value_type()
256 }
257
258 fn children(&self) -> &[Expr] {
259 std::slice::from_ref(&self.expr)
260 }
261 }
262
263 #[allow(rustdoc::broken_intra_doc_links)]
264 #[derive(Debug, Clone)]
265 pub struct $name {
266 pub expr: Arc<$name_expr>,
267 }
268
269 impl $name {
270 pub fn new(expr: $expr_type) -> Self {
271 Self {
272 expr: Arc::new($name_expr { expr }),
273 }
274 }
275 }
276 };
277}
278
279define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
280define_node_ast_extension!(
281 AnalyzeVerbose,
282 AnalyzeVerboseExpr,
283 Expr,
284 ANALYZE_VERBOSE_NODE_NAME
285);
286define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
287define_node_ast_extension!(
288 ExplainVerbose,
289 ExplainVerboseExpr,
290 Expr,
291 EXPLAIN_VERBOSE_NODE_NAME
292);
293#[derive(Debug, Clone)]
294pub struct AliasExpr {
295 pub expr: Expr,
296 pub alias: String,
297}
298impl ExtensionExpr for AliasExpr {
299 fn as_any(&self) -> &dyn Any {
300 self
301 }
302 fn name(&self) -> &str {
303 ALIAS_NODE_NAME
304 }
305 fn value_type(&self) -> ValueType {
306 self.expr.value_type()
307 }
308 fn children(&self) -> &[Expr] {
309 std::slice::from_ref(&self.expr)
310 }
311}
312#[derive(Debug, Clone)]
313pub struct Alias {
314 pub expr: Arc<AliasExpr>,
315}
316impl Alias {
317 pub fn new(expr: Expr, alias: String) -> Self {
318 Self {
319 expr: Arc::new(AliasExpr { expr, alias }),
320 }
321 }
322}
323
324#[cfg(test)]
325mod test {
326 use session::context::QueryContext;
327
328 use super::*;
329
330 #[test]
332 fn parse_sql_simple() {
333 let sql = "select * from t1";
334 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
335 let QueryStatement::Sql(sql_stmt) = stmt else {
336 panic!("Expected SQL statement, got {:?}", stmt);
337 };
338 assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
339 }
340
341 #[test]
342 fn parse_promql_timestamp() {
343 let cases = vec![
344 (
345 "1435781451.781",
346 SystemTime::UNIX_EPOCH
347 .checked_add(Duration::from_secs_f64(1435781451.781))
348 .unwrap(),
349 ),
350 ("0.000", SystemTime::UNIX_EPOCH),
351 ("00", SystemTime::UNIX_EPOCH),
352 (
353 "2015-07-01T20:10:51.781Z",
354 SystemTime::UNIX_EPOCH
355 .checked_add(Duration::from_secs_f64(1435781451.781))
356 .unwrap(),
357 ),
358 ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
359 ];
360
361 for (input, expected) in cases {
362 let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
363
364 let result = result
365 .duration_since(SystemTime::UNIX_EPOCH)
366 .unwrap()
367 .as_millis();
368 let expected = expected
369 .duration_since(SystemTime::UNIX_EPOCH)
370 .unwrap()
371 .as_millis();
372
373 assert!(result.abs_diff(expected) < 100);
375 }
376
377 let timestamp = "9223372036854775808.000";
379 let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
380 assert_eq!(
381 result.unwrap_err().to_string(),
382 "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
383 );
384 }
385
386 #[test]
387 fn parse_promql_simple() {
388 let promql = PromQuery {
389 query: "http_request".to_string(),
390 start: "2022-02-13T17:14:00Z".to_string(),
391 end: "2023-02-13T17:14:00Z".to_string(),
392 step: "1d".to_string(),
393 lookback: "5m".to_string(),
394 alias: Some("my_query".to_string()),
395 };
396
397 #[cfg(not(windows))]
398 let expected = String::from(
399 "\
400 Promql(EvalStmt { \
401 expr: Extension(Extension { \
402 expr: AliasExpr { \
403 expr: VectorSelector(VectorSelector { \
404 name: Some(\"http_request\"), \
405 matchers: Matchers { matchers: [], or_matchers: [] }, \
406 offset: None, at: None \
407 }), \
408 alias: \"my_query\" \
409 } \
410 }), \
411 start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
412 end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
413 interval: 86400s, \
414 lookback_delta: 300s \
415 }, Some(\"my_query\"))",
416 );
417
418 #[cfg(windows)]
420 let expected = String::from(
421 "\
422 Promql(EvalStmt { \
423 expr: Extension(Extension { \
424 expr: AliasExpr { \
425 expr: VectorSelector(VectorSelector { \
426 name: Some(\"http_request\"), \
427 matchers: Matchers { matchers: [], or_matchers: [] }, \
428 offset: None, at: None \
429 }), \
430 alias: \"my_query\" \
431 } \
432 }), \
433 start: SystemTime { intervals: 132892460400000000 }, \
434 end: SystemTime { intervals: 133207820400000000 }, \
435 interval: 86400s, \
436 lookback_delta: 300s \
437 }, Some(\"my_query\"))",
438 );
439
440 let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
441 assert_eq!(format!("{result:?}"), expected);
442 }
443}