1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::{Duration, SystemTime};
19
20use chrono::DateTime;
21use common_error::ext::{BoxedError, PlainError};
22use common_error::status_code::StatusCode;
23use common_telemetry::tracing;
24use promql_parser::parser::ast::{Extension as NodeExtension, ExtensionExpr};
25use promql_parser::parser::value::ValueType;
26use promql_parser::parser::Expr::Extension;
27use promql_parser::parser::{EvalStmt, Expr};
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use sql::dialect::GreptimeDbDialect;
31use sql::parser::{ParseOptions, ParserContext};
32use sql::statements::statement::Statement;
33
34use crate::error::{
35 AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu,
36 QueryParseSnafu, Result, UnimplementedSnafu,
37};
38use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
39
40pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
41pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
42pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
43pub const ANALYZE_NODE_NAME: &str = "ANALYZE";
44pub const ANALYZE_VERBOSE_NODE_NAME: &str = "ANALYZE VERBOSE";
45
46#[derive(Debug, Clone)]
47pub enum QueryStatement {
48 Sql(Statement),
49 Promql(EvalStmt),
50}
51
52impl QueryStatement {
53 pub fn post_process(&self, params: HashMap<String, String>) -> Result<QueryStatement> {
54 match self {
55 QueryStatement::Sql(_) => UnimplementedSnafu {
56 operation: "sql post process",
57 }
58 .fail(),
59 QueryStatement::Promql(eval_stmt) => {
60 let node_name = match params.get("name") {
61 Some(name) => name.as_str(),
62 None => "",
63 };
64 let extension_node = Self::create_extension_node(node_name, &eval_stmt.expr);
65 Ok(QueryStatement::Promql(EvalStmt {
66 expr: Extension(extension_node.unwrap()),
67 start: eval_stmt.start,
68 end: eval_stmt.end,
69 interval: eval_stmt.interval,
70 lookback_delta: eval_stmt.lookback_delta,
71 }))
72 }
73 }
74 }
75
76 fn create_extension_node(node_name: &str, expr: &Expr) -> Option<NodeExtension> {
77 match node_name {
78 ANALYZE_NODE_NAME => Some(NodeExtension {
79 expr: Arc::new(AnalyzeExpr { expr: expr.clone() }),
80 }),
81 ANALYZE_VERBOSE_NODE_NAME => Some(NodeExtension {
82 expr: Arc::new(AnalyzeVerboseExpr { expr: expr.clone() }),
83 }),
84 EXPLAIN_NODE_NAME => Some(NodeExtension {
85 expr: Arc::new(ExplainExpr { expr: expr.clone() }),
86 }),
87 EXPLAIN_VERBOSE_NODE_NAME => Some(NodeExtension {
88 expr: Arc::new(ExplainVerboseExpr { expr: expr.clone() }),
89 }),
90 _ => None,
91 }
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct PromQuery {
97 pub query: String,
98 pub start: String,
99 pub end: String,
100 pub step: String,
101 pub lookback: String,
102}
103
104impl Default for PromQuery {
105 fn default() -> Self {
106 PromQuery {
107 query: String::new(),
108 start: String::from("0"),
109 end: String::from("0"),
110 step: String::from("5m"),
111 lookback: String::from(DEFAULT_LOOKBACK_STRING),
112 }
113 }
114}
115
116pub struct QueryLanguageParser {}
118
119impl QueryLanguageParser {
120 pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
122 let _timer = PARSE_SQL_ELAPSED.start_timer();
123 let mut statement =
124 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
125 .map_err(BoxedError::new)
126 .context(QueryParseSnafu { query: sql })?;
127 if statement.len() != 1 {
128 MultipleStatementsSnafu {
129 query: sql.to_string(),
130 }
131 .fail()
132 } else {
133 Ok(QueryStatement::Sql(statement.pop().unwrap()))
134 }
135 }
136
137 #[tracing::instrument(skip_all)]
139 pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result<QueryStatement> {
140 let _timer = PARSE_PROMQL_ELAPSED.start_timer();
141
142 let expr = promql_parser::parser::parse(&query.query)
143 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
144 .context(QueryParseSnafu {
145 query: &query.query,
146 })?;
147
148 let start = Self::parse_promql_timestamp(&query.start)
149 .map_err(BoxedError::new)
150 .context(QueryParseSnafu {
151 query: &query.query,
152 })?;
153
154 let end = Self::parse_promql_timestamp(&query.end)
155 .map_err(BoxedError::new)
156 .context(QueryParseSnafu {
157 query: &query.query,
158 })?;
159
160 let step = query
161 .step
162 .parse::<u64>()
163 .map(Duration::from_secs)
164 .or_else(|_| promql_parser::util::parse_duration(&query.step))
165 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
166 .context(QueryParseSnafu {
167 query: &query.query,
168 })?;
169
170 let lookback_delta = query
171 .lookback
172 .parse::<u64>()
173 .map(Duration::from_secs)
174 .or_else(|_| promql_parser::util::parse_duration(&query.lookback))
175 .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
176 .context(QueryParseSnafu {
177 query: &query.query,
178 })?;
179
180 let eval_stmt = EvalStmt {
181 expr,
182 start,
183 end,
184 interval: step,
185 lookback_delta,
186 };
187
188 Ok(QueryStatement::Promql(eval_stmt))
189 }
190
191 pub fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
192 let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
194 .context(ParseTimestampSnafu { raw: timestamp })
195 .map(Into::<SystemTime>::into);
196
197 if rfc3339_result.is_ok() {
199 return rfc3339_result;
200 }
201
202 let secs = timestamp
204 .parse::<f64>()
205 .context(ParseFloatSnafu { raw: timestamp })
206 .map_err(|_| rfc3339_result.unwrap_err())?;
208
209 let duration = Duration::from_secs_f64(secs);
210 SystemTime::UNIX_EPOCH
211 .checked_add(duration)
212 .context(AddSystemTimeOverflowSnafu { duration })
213 }
214}
215
216macro_rules! define_node_ast_extension {
217 ($name:ident, $name_expr:ident, $expr_type:ty, $extension_name:expr) => {
218 #[derive(Debug, Clone)]
220 pub struct $name_expr {
221 pub expr: $expr_type,
222 }
223
224 impl ExtensionExpr for $name_expr {
225 fn as_any(&self) -> &dyn Any {
226 self
227 }
228
229 fn name(&self) -> &str {
230 $extension_name
231 }
232
233 fn value_type(&self) -> ValueType {
234 self.expr.value_type()
235 }
236
237 fn children(&self) -> &[Expr] {
238 std::slice::from_ref(&self.expr)
239 }
240 }
241
242 #[allow(rustdoc::broken_intra_doc_links)]
243 #[derive(Debug, Clone)]
244 pub struct $name {
245 pub expr: Arc<$name_expr>,
246 }
247
248 impl $name {
249 pub fn new(expr: $expr_type) -> Self {
250 Self {
251 expr: Arc::new($name_expr { expr }),
252 }
253 }
254 }
255 };
256}
257
258define_node_ast_extension!(Analyze, AnalyzeExpr, Expr, ANALYZE_NODE_NAME);
259define_node_ast_extension!(
260 AnalyzeVerbose,
261 AnalyzeVerboseExpr,
262 Expr,
263 ANALYZE_VERBOSE_NODE_NAME
264);
265define_node_ast_extension!(Explain, ExplainExpr, Expr, EXPLAIN_NODE_NAME);
266define_node_ast_extension!(
267 ExplainVerbose,
268 ExplainVerboseExpr,
269 Expr,
270 EXPLAIN_VERBOSE_NODE_NAME
271);
272
273#[cfg(test)]
274mod test {
275 use session::context::QueryContext;
276
277 use super::*;
278
279 #[test]
281 fn parse_sql_simple() {
282 let sql = "select * from t1";
283 let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
284 let QueryStatement::Sql(sql_stmt) = stmt else {
285 panic!("Expected SQL statement, got {:?}", stmt);
286 };
287 assert_eq!("SELECT * FROM t1", sql_stmt.to_string());
288 }
289
290 #[test]
291 fn parse_promql_timestamp() {
292 let cases = vec![
293 (
294 "1435781451.781",
295 SystemTime::UNIX_EPOCH
296 .checked_add(Duration::from_secs_f64(1435781451.781))
297 .unwrap(),
298 ),
299 ("0.000", SystemTime::UNIX_EPOCH),
300 ("00", SystemTime::UNIX_EPOCH),
301 (
302 "2015-07-01T20:10:51.781Z",
303 SystemTime::UNIX_EPOCH
304 .checked_add(Duration::from_secs_f64(1435781451.781))
305 .unwrap(),
306 ),
307 ("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
308 ];
309
310 for (input, expected) in cases {
311 let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
312
313 let result = result
314 .duration_since(SystemTime::UNIX_EPOCH)
315 .unwrap()
316 .as_millis();
317 let expected = expected
318 .duration_since(SystemTime::UNIX_EPOCH)
319 .unwrap()
320 .as_millis();
321
322 assert!(result.abs_diff(expected) < 100);
324 }
325
326 let timestamp = "9223372036854775808.000";
328 let result = QueryLanguageParser::parse_promql_timestamp(timestamp);
329 assert_eq!(
330 result.unwrap_err().to_string(),
331 "Failed to add duration '9223372036854775808s' to SystemTime, overflowed"
332 );
333 }
334
335 #[test]
336 fn parse_promql_simple() {
337 let promql = PromQuery {
338 query: "http_request".to_string(),
339 start: "2022-02-13T17:14:00Z".to_string(),
340 end: "2023-02-13T17:14:00Z".to_string(),
341 step: "1d".to_string(),
342 lookback: "5m".to_string(),
343 };
344
345 #[cfg(not(windows))]
346 let expected = String::from(
347 "\
348 Promql(EvalStmt { \
349 expr: VectorSelector(VectorSelector { \
350 name: Some(\"http_request\"), \
351 matchers: Matchers { matchers: [], or_matchers: [] }, \
352 offset: None, at: None }), \
353 start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
354 end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
355 interval: 86400s, \
356 lookback_delta: 300s \
357 })",
358 );
359
360 #[cfg(windows)]
362 let expected = String::from(
363 "\
364 Promql(EvalStmt { \
365 expr: VectorSelector(VectorSelector { \
366 name: Some(\"http_request\"), \
367 matchers: Matchers { matchers: [], or_matchers: [] }, \
368 offset: None, at: None }), \
369 start: SystemTime { intervals: 132892460400000000 }, \
370 end: SystemTime { intervals: 133207820400000000 }, \
371 interval: 86400s, \
372 lookback_delta: 300s \
373 })",
374 );
375
376 let result = QueryLanguageParser::parse_promql(&promql, &QueryContext::arc()).unwrap();
377 assert_eq!(format!("{result:?}"), expected);
378 }
379}