1use std::fmt;
16
17use serde::Serialize;
18use snafu::ResultExt;
19use sqlparser::ast::helpers::attached_token::AttachedToken;
20use sqlparser::ast::{
21 Cte, Ident, ObjectName, Query as SpQuery, TableAlias, TableAliasColumnDef, With,
22};
23use sqlparser::keywords::Keyword;
24use sqlparser::parser::IsOptional;
25use sqlparser::tokenizer::Token;
26use sqlparser_derive::{Visit, VisitMut};
27
28use crate::dialect::GreptimeDbDialect;
29use crate::error::{self, Result};
30use crate::parser::ParserContext;
31use crate::parsers::tql_parser;
32use crate::statements::query::Query;
33use crate::statements::statement::Statement;
34use crate::statements::tql::Tql;
35use crate::util::location_to_index;
36
37#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
39pub enum CteContent {
40 Sql(Box<SpQuery>),
41 Tql(Tql),
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
46pub struct HybridCte {
47 pub name: Ident,
48 pub columns: Vec<ObjectName>,
50 pub content: CteContent,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
55pub struct HybridCteWith {
56 pub recursive: bool,
57 pub cte_tables: Vec<HybridCte>,
58}
59
60impl fmt::Display for HybridCteWith {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "WITH ")?;
63
64 if self.recursive {
65 write!(f, "RECURSIVE ")?;
66 }
67
68 for (i, cte) in self.cte_tables.iter().enumerate() {
69 if i > 0 {
70 write!(f, ", ")?;
71 }
72 write!(f, "{}", cte.name)?;
73
74 if !cte.columns.is_empty() {
75 write!(f, " (")?;
76 for (j, col) in cte.columns.iter().enumerate() {
77 if j > 0 {
78 write!(f, ", ")?;
79 }
80 write!(f, "{}", col)?;
81 }
82 write!(f, ")")?;
83 }
84
85 write!(f, " AS (")?;
86 match &cte.content {
87 CteContent::Sql(query) => write!(f, "{}", query)?,
88 CteContent::Tql(tql) => write!(f, "{}", tql)?,
89 }
90 write!(f, ")")?;
91 }
92 Ok(())
93 }
94}
95
96impl ParserContext<'_> {
98 pub(crate) fn parse_with_tql(&mut self) -> Result<Statement> {
100 self.parse_with_tql_with_now(false)
101 }
102
103 pub(crate) fn parse_with_tql_with_now(&mut self, require_now_expr: bool) -> Result<Statement> {
104 self.parser
106 .expect_keyword(Keyword::WITH)
107 .context(error::SyntaxSnafu)?;
108
109 let recursive = self.parser.parse_keyword(Keyword::RECURSIVE);
111
112 let mut tql_cte_tables = Vec::new();
114 let mut sql_cte_tables = Vec::new();
115
116 loop {
117 let cte = self.parse_hybrid_cte(require_now_expr)?;
118 match cte.content {
119 CteContent::Sql(body) => sql_cte_tables.push(Cte {
120 alias: TableAlias {
121 explicit: false,
122 name: cte.name,
123 columns: cte
124 .columns
125 .into_iter()
126 .flat_map(|col| col.0[0].as_ident().cloned())
127 .map(|name| TableAliasColumnDef {
128 name,
129 data_type: None,
130 })
131 .collect(),
132 },
133 query: body,
134 from: None,
135 materialized: None,
136 closing_paren_token: AttachedToken::empty(),
137 }),
138 CteContent::Tql(_) => tql_cte_tables.push(cte),
139 }
140
141 if !self.parser.consume_token(&Token::Comma) {
142 break;
143 }
144 }
145
146 let main_query = self.parser.parse_query().context(error::SyntaxSnafu)?;
148
149 let mut query = Query::try_from(*main_query)?;
151 if !tql_cte_tables.is_empty() {
152 query.hybrid_cte = Some(HybridCteWith {
153 recursive,
154 cte_tables: tql_cte_tables,
155 });
156 }
157 query.inner.with = Some(With {
158 recursive,
159 cte_tables: sql_cte_tables,
160 with_token: AttachedToken::empty(),
161 });
162
163 Ok(Statement::Query(Box::new(query)))
164 }
165
166 fn parse_hybrid_cte(&mut self, require_now_expr: bool) -> Result<HybridCte> {
168 let name = self.parser.parse_identifier().context(error::SyntaxSnafu)?;
170 let name = Self::canonicalize_identifier(name);
171
172 let columns = self
174 .parser
175 .parse_parenthesized_qualified_column_list(IsOptional::Optional, true)
176 .context(error::SyntaxSnafu)?;
177
178 self.parser
180 .expect_keyword(Keyword::AS)
181 .context(error::SyntaxSnafu)?;
182
183 self.parser
185 .expect_token(&Token::LParen)
186 .context(error::SyntaxSnafu)?;
187
188 let content = self.parse_cte_content(require_now_expr)?;
189
190 self.parser
191 .expect_token(&Token::RParen)
192 .context(error::SyntaxSnafu)?;
193
194 Ok(HybridCte {
195 name,
196 columns,
197 content,
198 })
199 }
200
201 fn parse_cte_content(&mut self, require_now_expr: bool) -> Result<CteContent> {
203 if let Token::Word(w) = &self.parser.peek_token().token
205 && w.keyword == Keyword::NoKeyword
206 && w.quote_style.is_none()
207 && w.value.to_uppercase() == tql_parser::TQL
208 {
209 let tql = self.parse_tql_content_in_cte(require_now_expr)?;
210 return Ok(CteContent::Tql(tql));
211 }
212
213 let sql_query = self.parser.parse_query().context(error::SyntaxSnafu)?;
215 Ok(CteContent::Sql(sql_query))
216 }
217
218 fn parse_tql_content_in_cte(&mut self, require_now_expr: bool) -> Result<Tql> {
226 let tql_token = self.parser.next_token();
228 if tql_token.token == Token::EOF {
229 return Err(error::InvalidSqlSnafu {
230 msg: "Unexpected end of input while parsing TQL inside CTE".to_string(),
231 }
232 .build());
233 }
234
235 let start_location = tql_token.span.start;
236
237 let mut paren_depth = 0usize;
239 let end_location;
240
241 loop {
242 let token_with_span = self.parser.peek_token();
243
244 if token_with_span.token == Token::EOF {
246 return Err(error::InvalidSqlSnafu {
247 msg: "Unexpected end of input while parsing TQL inside CTE".to_string(),
248 }
249 .build());
250 }
251
252 if token_with_span.token == Token::RParen && paren_depth == 0 {
254 end_location = token_with_span.span.start;
255 break;
256 }
257
258 let consumed = self.parser.next_token();
260 match consumed.token {
261 Token::LParen => paren_depth += 1,
262 Token::RParen => {
263 paren_depth = paren_depth.saturating_sub(1);
266 }
267 _ => {}
268 }
269 }
270
271 let start_index = location_to_index(self.sql, &start_location);
273 let end_index = location_to_index(self.sql, &end_location);
274 let tql_string = &self.sql[start_index..end_index];
275 let tql_string = tql_string.trim();
276
277 let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, tql_string)?;
278 let statement = parser_ctx.parse_tql(require_now_expr)?;
279
280 match statement {
281 Statement::Tql(Tql::Eval(eval)) => Ok(Tql::Eval(eval)),
282 Statement::Tql(_) => Err(error::InvalidSqlSnafu {
283 msg: "Only TQL EVAL is supported in CTEs".to_string(),
284 }
285 .build()),
286 _ => Err(error::InvalidSqlSnafu {
287 msg: "Expected a TQL statement inside CTE".to_string(),
288 }
289 .build()),
290 }
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use crate::dialect::GreptimeDbDialect;
297 use crate::parser::{ParseOptions, ParserContext};
298 use crate::parsers::with_tql_parser::CteContent;
299 use crate::statements::statement::Statement;
300 use crate::statements::tql::Tql;
301
302 #[test]
303 fn test_parse_hybrid_cte_with_parentheses_in_query() {
304 let sql = r#"
306 WITH tql_cte AS (
307 TQL EVAL (0, 100, '5s')
308 sum(rate(http_requests_total[1m])) + (max(cpu_usage) * (1 + 0.5))
309 )
310 SELECT * FROM tql_cte
311 "#;
312
313 let statements =
314 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
315 .unwrap();
316 assert_eq!(statements.len(), 1);
317
318 let Statement::Query(query) = &statements[0] else {
319 panic!("Expected Query statement");
320 };
321 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
322 assert_eq!(hybrid_cte.cte_tables.len(), 1);
323
324 assert!(matches!(
326 hybrid_cte.cte_tables[0].content,
327 CteContent::Tql(_)
328 ));
329
330 if let CteContent::Tql(Tql::Eval(eval)) = &hybrid_cte.cte_tables[0].content {
332 assert!(eval.query.contains("sum(rate(http_requests_total[1m]))"));
335 assert!(eval.query.contains("(max(cpu_usage) * (1 + 0.5))"));
336 assert!(eval.query.contains("+ (max"));
338 }
339 }
340
341 #[test]
342 fn test_parse_hybrid_cte_sql_and_tql() {
343 let sql = r#"
344 WITH
345 sql_cte(ts, value, label) AS (SELECT timestamp, val, name FROM metrics),
346 tql_cte(time, metric_value) AS (TQL EVAL (0, 100, '5s') cpu_usage)
347 SELECT s.ts, s.value, t.metric_value
348 FROM sql_cte s JOIN tql_cte t ON s.ts = t.time
349 "#;
350
351 let statements =
352 ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
353 .unwrap();
354 assert_eq!(statements.len(), 1);
355
356 let Statement::Query(query) = &statements[0] else {
357 panic!("Expected Query statement");
358 };
359 let hybrid_cte = query.hybrid_cte.as_ref().unwrap();
360 assert_eq!(hybrid_cte.cte_tables.len(), 1); let second_cte = &hybrid_cte.cte_tables[0];
364 assert!(matches!(second_cte.content, CteContent::Tql(_)));
365 assert_eq!(second_cte.columns.len(), 2);
366 assert_eq!(
367 second_cte
368 .columns
369 .iter()
370 .map(|x| x.to_string())
371 .collect::<Vec<_>>()
372 .join(" "),
373 "time metric_value"
374 );
375 }
376}