1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::Expr::Extension;
25use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
26use promql_parser::parser::value::ValueType;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35 AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36 QueryParseSnafu, Result, TryIntoDurationSnafu, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45
46#[derive(Debug, Clone)]
47pub enum QueryStatement {
48 Sql(Statement),
49 Promql(EvalStmt),
50}
51
52impl QueryStatement {
53 pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
54 match self {
55 QueryStatement::Sql(_) => UnimplementedSnafu {
56 operation: "sql post process",
57 }
58 .fail(),
59 QueryStatement::Promql(eval_stmt) => {
60 let node_name = match params.get("name") {
61 Some(name) => name.as_str(),
62 None => "",
63 };
64 let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
65 Ok(QueryStatement::Promql(EvalStmt {
66 expr: Extension(extension_node.unwrap()),
67 start: eval_stmt.start,
68 end: eval_stmt.end,
69 interval: eval_stmt.interval,
70 lookback_delta: eval_stmt.lookback_delta,
71 }))
72 }
73 }
74 }
75
76 fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
77 match node_name {
78 ANALYZE_NODE_NAME => Some(NodeExtension {
79 expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
80 }),
81 ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
82 expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
83 }),
84 EXPLAIN_NODE_NAME => Some(NodeExtension {
85 expr: Arc::new(ExplainExpr { expr: expr.clone() }),
86 }),
87 EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
88 expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
89 }),
90 _ => None,
91 }
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct PromQuery {
97 pub query: String,
98 pub start: String,
99 pub end: String,
100 pub step: String,
101 pub lookback: String,
102}
103
104impl Default for PromQuery {
105 fn default() -> Self {
106 PromQuery {
107 query: String::new(),
108 start: String::from("0"),
109 end: String::from("0"),
110 step: String::from("5m"),
111 lookback: String::from(DEFAULT_LOOKBACK_STRING),
112 }
113 }
114}
115
116pub struct QueryLanguageParser {}
118
119impl QueryLanguageParser {
120 pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
122 let _timer = PARSE_SQL_ELAPSED.start_timer();
123 let mut statement =
124 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
125 .map_err(BoxedError::new)
126 .context(QueryParseSnafu { query: sql })?;
127 if statement.len() != 1 {
128 MultipleStatementsSnafu {
129 query: sql.to_string(),
130 }
131 .fail()
132 } else {
133 Ok(QueryStatement::Sql(statement.pop().unwrap()))
134 }
135 }
136
137 #[tracing::instrument(skip_all)]
139 pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
140 let _timer = PARSE_PROMQL_ELAPSED.start_timer();
141
142 let expr = promql_parser::parser::parse(&query.query)
143 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
144 .context(QueryParseSnafu {
145 query: &query.query,
146 })?;
147
148 let start = Self::parse_promql_timestamp(&query.start)
149 .map_err(BoxedError::new)
150 .context(QueryParseSnafu {
151 query: &query.query,
152 })?;
153
154 let end = Self::parse_promql_timestamp(&query.end)
155 .map_err(BoxedError::new)
156 .context(QueryParseSnafu {
157 query: &query.query,
158 })?;
159
160 let step = query
161 .step
162 .parse::<u64>()
163 .map(Duration::from_secs)
164 .or_else(|_| promql_parser::util::parse_duration(&query.step))
165 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
166 .context(QueryParseSnafu {
167 query: &query.query,
168 })?;
169
170 let lookback_delta = query
171 .lookback
172 .parse::<u64>()
173 .map(Duration::from_secs)
174 .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
175 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
176 .context(QueryParseSnafu {
177 query: &query.query,
178 })?;
179
180 let eval_stmt = EvalStmt {
181 expr,
182 start,
183 end,
184 interval: step,
185 lookback_delta,
186 };
187
188 Ok(QueryStatement::Promql(eval_stmt))
189 }
190
191 pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
192 let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
194 .context(ParseTimestampSnafu { raw: timestamp })
195 .map(Into::<SystemTime>::into);
196
197 if rfc3339_result.is_ok() {
199 return rfc3339_result;
200 }
201
202 let secs = timestamp
204 .parse::<f64>()
205 .context(ParseFloatSnafu { raw: timestamp })
206 .map_err(|_| rfc3339_result.unwrap_err())?;
208
209 let duration =
210 Duration::try_from_secs_f64(secs).context(TryIntoDurationSnafu { raw: timestamp })?;
211 SystemTime::UNIX_EPOCH
212 .checked_add(duration)
213 .context(AddSystemTimeOverflowSnafu { duration })
214 }
215}
216
217macro_rules! define_node_ast_extension {
218 ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
219 #[derive(Debug, Clone)]
221 pub struct $name_expr {
222 pub expr: $expr_type,
223 }
224
225 impl ExtensionExpr for $name_expr {
226 fn as_any(&self) -> &dyn Any {
227 self
228 }
229
230 fn name(&self) -> &str {
231 $extension_name
232 }
233
234 fn value_type(&self) -> ValueType {
235 self.expr.value_type()
236 }
237
238 fn children(&self) -> &[Expr] {
239 std::slice::from_ref(&self.expr)
240 }
241 }
242
243 #[allow(rustdoc::broken_intra_doc_links)]
244 #[derive(Debug, Clone)]
245 pub struct $name {
246 pub expr: Arc<$name_expr>,
247 }
248
249 impl $name {
250 pub fn new(expr: $expr_type) -> Self {
251 Self {
252 expr: Arc::new($name_expr { expr }),
253 }
254 }
255 }
256 };
257}
258
259define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
260define_node_ast_extension!(
261 AnalyzeVerbose,
262 AnalyzeVerboseExpr,
263 Expr,
264 ANALYZE_VERBOSE_NODE_NAME
265);
266define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
267define_node_ast_extension!(
268 ExplainVerbose,
269 ExplainVerboseExpr,
270 Expr,
271 EXPLAIN_VERBOSE_NODE_NAME
272);
273
274#[cfg(test)]
275mod test {
276 use session::context::QueryContext;
277
278 use super::*;
279
280 #[test]
282 fn parse_sql_simple() {
283 let sql = "select * from t1";
284 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
285 let QueryStatement::Sql(sql_stmt) = stmt else {
286 panic!("Expected SQL statement, got {:?}", stmt);
287 };
288 assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
289 }
290
291 #[test]
292 fn parse_promql_timestamp() {
293 let cases = vec![
294 (
295 "1435781451.781",
296 SystemTime::UNIX_EPOCH
297 .checked_add(Duration::from_secs_f64(1435781451.781))
298 .unwrap(),
299 ),
300 ("0.000", SystemTime::UNIX_EPOCH),
301 ("00", SystemTime::UNIX_EPOCH),
302 (
303 "2015-07-01T20:10:51.781Z",
304 SystemTime::UNIX_EPOCH
305 .checked_add(Duration::from_secs_f64(1435781451.781))
306 .unwrap(),
307 ),
308 ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
309 ];
310
311 for (input, expected) in cases {
312 let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
313
314 let result = result
315 .duration_since(SystemTime::UNIX_EPOCH)
316 .unwrap()
317 .as_millis();
318 let expected = expected
319 .duration_since(SystemTime::UNIX_EPOCH)
320 .unwrap()
321 .as_millis();
322
323 assert!(result.abs_diff(expected) < 100);
325 }
326
327 let timestamp = "9223372036854775808.000";
329 let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
330 assert_eq!(
331 result.unwrap_err().to_string(),
332 "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
333 );
334 }
335
336 #[test]
337 fn parse_promql_simple() {
338 let promql = PromQuery {
339 query: "http_request".to_string(),
340 start: "2022-02-13T17:14:00Z".to_string(),
341 end: "2023-02-13T17:14:00Z".to_string(),
342 step: "1d".to_string(),
343 lookback: "5m".to_string(),
344 };
345
346 #[cfg(not(windows))]
347 let expected = String::from(
348 "\
349 Promql(EvalStmt { \
350 expr: VectorSelector(VectorSelector { \
351 name: Some(\"http_request\"), \
352 matchers: Matchers { matchers: [], or_matchers: [] }, \
353 offset: None, at: None }), \
354 start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
355 end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
356 interval: 86400s, \
357 lookback_delta: 300s \
358 })",
359 );
360
361 #[cfg(windows)]
363 let expected = String::from(
364 "\
365 Promql(EvalStmt { \
366 expr: VectorSelector(VectorSelector { \
367 name: Some(\"http_request\"), \
368 matchers: Matchers { matchers: [], or_matchers: [] }, \
369 offset: None, at: None }), \
370 start: SystemTime { intervals: 132892460400000000 }, \
371 end: SystemTime { intervals: 133207820400000000 }, \
372 interval: 86400s, \
373 lookback_delta: 300s \
374 })",
375 );
376
377 let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
378 assert_eq!(format!("{result:?}"), expected);
379 }
380}