1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::Expr::Extension;
25use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
26use promql_parser::parser::value::ValueType;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35 AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36 QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45
46#[derive(Debug, Clone)]
47pub enum QueryStatement {
48 Sql(Statement),
49 Promql(EvalStmt, Option<String>),
51}
52
53impl QueryStatement {
54 pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
55 match self {
56 QueryStatement::Sql(_) => UnimplementedSnafu {
57 operation: "sql post process",
58 }
59 .fail(),
60 QueryStatement::Promql(eval_stmt, alias) => {
61 let node_name = match params.get("name") {
62 Some(name) => name.as_str(),
63 None => "",
64 };
65 let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
66 Ok(QueryStatement::Promql(
67 EvalStmt {
68 expr: Extension(extension_node.unwrap()),
69 start: eval_stmt.start,
70 end: eval_stmt.end,
71 interval: eval_stmt.interval,
72 lookback_delta: eval_stmt.lookback_delta,
73 },
74 alias.clone(),
75 ))
76 }
77 }
78 }
79
80 fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
81 match node_name {
82 ANALYZE_NODE_NAME => Some(NodeExtension {
83 expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
84 }),
85 ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
86 expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
87 }),
88 EXPLAIN_NODE_NAME => Some(NodeExtension {
89 expr: Arc::new(ExplainExpr { expr: expr.clone() }),
90 }),
91 EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
92 expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
93 }),
94 _ => None,
95 }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct PromQuery {
101 pub query: String,
102 pub start: String,
103 pub end: String,
104 pub step: String,
105 pub lookback: String,
106 pub alias: Option<String>,
107}
108
109impl Default for PromQuery {
110 fn default() -> Self {
111 PromQuery {
112 query: String::new(),
113 start: String::from("0"),
114 end: String::from("0"),
115 step: String::from("5m"),
116 lookback: String::from(DEFAULT_LOOKBACK_STRING),
117 alias: None,
118 }
119 }
120}
121
122pub struct QueryLanguageParser {}
124
125impl QueryLanguageParser {
126 pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
128 let _timer = PARSE_SQL_ELAPSED.start_timer();
129 let mut statement =
130 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
131 .map_err(BoxedError::new)
132 .context(QueryParseSnafu { query: sql })?;
133 if statement.len() != 1 {
134 MultipleStatementsSnafu {
135 query: sql.to_string(),
136 }
137 .fail()
138 } else {
139 Ok(QueryStatement::Sql(statement.pop().unwrap()))
140 }
141 }
142
143 #[tracing::instrument(skip_all)]
145 pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
146 let _timer = PARSE_PROMQL_ELAPSED.start_timer();
147
148 let expr = promql_parser::parser::parse(&query.query)
149 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
150 .context(QueryParseSnafu {
151 query: &query.query,
152 })?;
153
154 let start = Self::parse_promql_timestamp(&query.start)
155 .map_err(BoxedError::new)
156 .context(QueryParseSnafu {
157 query: &query.query,
158 })?;
159
160 let end = Self::parse_promql_timestamp(&query.end)
161 .map_err(BoxedError::new)
162 .context(QueryParseSnafu {
163 query: &query.query,
164 })?;
165
166 let step = query
167 .step
168 .parse::<u64>()
169 .map(Duration::from_secs)
170 .or_else(|_| promql_parser::util::parse_duration(&query.step))
171 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
172 .context(QueryParseSnafu {
173 query: &query.query,
174 })?;
175
176 let lookback_delta = query
177 .lookback
178 .parse::<u64>()
179 .map(Duration::from_secs)
180 .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
181 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
182 .context(QueryParseSnafu {
183 query: &query.query,
184 })?;
185
186 let eval_stmt = EvalStmt {
187 expr,
188 start,
189 end,
190 interval: step,
191 lookback_delta,
192 };
193
194 Ok(QueryStatement::Promql(eval_stmt, query.alias.clone()))
195 }
196
197 pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
198 let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
200 .context(ParseTimestampSnafu { raw: timestamp })
201 .map(Into::<SystemTime>::into);
202
203 if rfc3339_result.is_ok() {
205 return rfc3339_result;
206 }
207
208 let secs = timestamp
210 .parse::<f64>()
211 .context(ParseFloatSnafu { raw: timestamp })
212 .map_err(|_| rfc3339_result.unwrap_err())?;
214
215 let duration =
216 Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?;
217 SystemTime::UNIX_EPOCH
218 .checked_add(duration)
219 .context(AddSystemTimeOverflowSnafu { duration })
220 }
221}
222
223macro_rules! define_node_ast_extension {
224 ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
225 #[derive(Debug, Clone)]
227 pub struct $name_expr {
228 pub expr: $expr_type,
229 }
230
231 impl ExtensionExpr for $name_expr {
232 fn as_any(&self) -> &dyn Any {
233 self
234 }
235
236 fn name(&self) -> &str {
237 $extension_name
238 }
239
240 fn value_type(&self) -> ValueType {
241 self.expr.value_type()
242 }
243
244 fn children(&self) -> &[Expr] {
245 std::slice::from_ref(&self.expr)
246 }
247 }
248
249 #[allow(rustdoc::broken_intra_doc_links)]
250 #[derive(Debug, Clone)]
251 pub struct $name {
252 pub expr: Arc<$name_expr>,
253 }
254
255 impl $name {
256 pub fn new(expr: $expr_type) -> Self {
257 Self {
258 expr: Arc::new($name_expr { expr }),
259 }
260 }
261 }
262 };
263}
264
265define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
266define_node_ast_extension!(
267 AnalyzeVerbose,
268 AnalyzeVerboseExpr,
269 Expr,
270 ANALYZE_VERBOSE_NODE_NAME
271);
272define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
273define_node_ast_extension!(
274 ExplainVerbose,
275 ExplainVerboseExpr,
276 Expr,
277 EXPLAIN_VERBOSE_NODE_NAME
278);
279
280#[cfg(test)]
281mod test {
282 use session::context::QueryContext;
283
284 use super::*;
285
286 #[test]
288 fn parse_sql_simple() {
289 let sql = "select * from t1";
290 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
291 let QueryStatement::Sql(sql_stmt) = stmt else {
292 panic!("Expected SQL statement, got {:?}", stmt);
293 };
294 assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
295 }
296
297 #[test]
298 fn parse_promql_timestamp() {
299 let cases = vec![
300 (
301 "1435781451.781",
302 SystemTime::UNIX_EPOCH
303 .checked_add(Duration::from_secs_f64(1435781451.781))
304 .unwrap(),
305 ),
306 ("0.000", SystemTime::UNIX_EPOCH),
307 ("00", SystemTime::UNIX_EPOCH),
308 (
309 "2015-07-01T20:10:51.781Z",
310 SystemTime::UNIX_EPOCH
311 .checked_add(Duration::from_secs_f64(1435781451.781))
312 .unwrap(),
313 ),
314 ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
315 ];
316
317 for (input, expected) in cases {
318 let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
319
320 let result = result
321 .duration_since(SystemTime::UNIX_EPOCH)
322 .unwrap()
323 .as_millis();
324 let expected = expected
325 .duration_since(SystemTime::UNIX_EPOCH)
326 .unwrap()
327 .as_millis();
328
329 assert!(result.abs_diff(expected) < 100);
331 }
332
333 let timestamp = "9223372036854775808.000";
335 let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
336 assert_eq!(
337 result.unwrap_err().to_string(),
338 "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
339 );
340 }
341
342 #[test]
343 fn parse_promql_simple() {
344 let promql = PromQuery {
345 query: "http_request".to_string(),
346 start: "2022-02-13T17:14:00Z".to_string(),
347 end: "2023-02-13T17:14:00Z".to_string(),
348 step: "1d".to_string(),
349 lookback: "5m".to_string(),
350 alias: Some("my_query".to_string()),
351 };
352
353 #[cfg(not(windows))]
354 let expected = String::from(
355 "\
356 Promql(EvalStmt { \
357 expr: VectorSelector(VectorSelector { \
358 name: Some(\"http_request\"), \
359 matchers: Matchers { matchers: [], or_matchers: [] }, \
360 offset: None, at: None }), \
361 start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
362 end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
363 interval: 86400s, \
364 lookback_delta: 300s \
365 }, Some(\"my_query\"))",
366 );
367
368 #[cfg(windows)]
370 let expected = String::from(
371 "\
372 Promql(EvalStmt { \
373 expr: VectorSelector(VectorSelector { \
374 name: Some(\"http_request\"), \
375 matchers: Matchers { matchers: [], or_matchers: [] }, \
376 offset: None, at: None }), \
377 start: SystemTime { intervals: 132892460400000000 }, \
378 end: SystemTime { intervals: 133207820400000000 }, \
379 interval: 86400s, \
380 lookback_delta: 300s \
381 }, Some(\"my_query\"))",
382 );
383
384 let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
385 assert_eq!(format!("{result:?}"), expected);
386 }
387}