1use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
16use serde::{Deserialize, Serialize};
17use table::table_name::TableName;
18
19use crate::error::{
20 EndBeforeStartSnafu, InvalidDateFormatSnafu, InvalidSpanFormatSnafu, InvalidTimeFilterSnafu,
21 Result,
22};
23
24#[derive(Debug, Serialize, Deserialize)]
26pub struct LogQuery {
27 pub table: TableName,
30 pub time_filter: TimeFilter,
32 pub limit: Limit,
34 pub columns: Vec<String>,
41
42 pub filters: Filters,
47 pub context: Context,
51
52 pub exprs: Vec<LogExpr>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum Filters {
60 Single(ColumnFilters),
62 And(Vec<Filters>),
64 Or(Vec<Filters>),
66 Not(Box<Filters>),
67}
68
69impl Default for Filters {
70 fn default() -> Self {
71 Filters::And(vec![])
72 }
73}
74
75impl From<ColumnFilters> for Filters {
76 fn from(filter: ColumnFilters) -> Self {
77 Filters::Single(filter)
78 }
79}
80
81impl Filters {
82 pub fn and<T: Into<Filters>>(other: Vec<T>) -> Filters {
83 Filters::And(other.into_iter().map(Into::into).collect())
84 }
85
86 pub fn or<T: Into<Filters>>(other: Vec<T>) -> Filters {
87 Filters::Or(other.into_iter().map(Into::into).collect())
88 }
89
90 pub fn single(filter: ColumnFilters) -> Filters {
91 Filters::Single(filter)
92 }
93}
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct AggFunc {
97 pub name: String,
99 pub args: Vec<LogExpr>,
101 pub alias: Option<String>,
102}
103
104impl AggFunc {
105 pub fn new(name: String, args: Vec<LogExpr>, alias: Option<String>) -> Self {
106 Self { name, args, alias }
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LogExpr {
113 NamedIdent(String),
114 PositionalIdent(usize),
115 Literal(String),
116 ScalarFunc {
117 name: String,
118 args: Vec<LogExpr>,
119 alias: Option<String>,
120 },
121 AggrFunc {
123 expr: Vec<AggFunc>,
125 by: Vec<LogExpr>,
126 },
127 Decompose {
128 expr: Box<LogExpr>,
129 schema: String,
131 fields: Vec<(String, String)>,
133 },
134 BinaryOp {
135 left: Box<LogExpr>,
136 op: BinaryOperator,
137 right: Box<LogExpr>,
138 },
139 Alias {
140 expr: Box<LogExpr>,
141 alias: String,
142 },
143 Filter {
144 filter: ColumnFilters,
145 },
146}
147
148impl Default for LogQuery {
149 fn default() -> Self {
150 Self {
151 table: TableName::new("", "", ""),
152 time_filter: Default::default(),
153 filters: Filters::And(vec![]),
154 limit: Limit::default(),
155 context: Default::default(),
156 columns: vec![],
157 exprs: vec![],
158 }
159 }
160}
161
162#[derive(Debug, Clone, Default, Serialize, Deserialize)]
187pub struct TimeFilter {
188 pub start: Option<String>,
189 pub end: Option<String>,
190 pub span: Option<String>,
191}
192
193impl TimeFilter {
194 #[allow(unused_assignments)] pub fn canonicalize(&mut self) -> Result<()> {
199 let mut start_dt = None;
200 let mut end_dt = None;
201
202 if self.start.is_some() && self.end.is_none() && self.span.is_none() {
203 let s = self.start.as_ref().unwrap();
205 let (start, end_opt) = Self::parse_datetime(s)?;
206 if end_opt.is_none() {
207 return Err(InvalidTimeFilterSnafu {
208 filter: self.clone(),
209 }
210 .build());
211 }
212 start_dt = Some(start);
213 end_dt = end_opt;
214 } else if self.start.is_some() && self.end.is_some() {
215 let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
217 let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
218 start_dt = Some(start);
219 end_dt = Some(end);
220 } else if self.span.is_some() && (self.start.is_some() || self.end.is_some()) {
221 let span = Self::parse_span(self.span.as_ref().unwrap())?;
223 if self.start.is_some() {
224 let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
225 let end = start + span;
226 start_dt = Some(start);
227 end_dt = Some(end);
228 } else {
229 let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
230 let start = end - span;
231 start_dt = Some(start);
232 end_dt = Some(end);
233 }
234 } else if self.span.is_some() && self.start.is_none() && self.end.is_none() {
235 let span = Self::parse_span(self.span.as_ref().unwrap())?;
237 let end = Utc::now();
238 let start = end - span;
239 start_dt = Some(start);
240 end_dt = Some(end);
241 } else if self.start.is_some() && self.span.is_some() && self.end.is_some() {
242 let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
244 let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
245 start_dt = Some(start);
246 end_dt = Some(end);
247 } else {
248 return Err(InvalidTimeFilterSnafu {
250 filter: self.clone(),
251 }
252 .build());
253 }
254
255 if let (Some(start), Some(end)) = (&start_dt, &end_dt)
257 && end <= start
258 {
259 return Err(EndBeforeStartSnafu {
260 start: start.to_rfc3339(),
261 end: end.to_rfc3339(),
262 }
263 .build());
264 }
265
266 if let Some(start) = start_dt {
268 self.start = Some(start.to_rfc3339());
269 }
270
271 if let Some(end) = end_dt {
272 self.end = Some(end.to_rfc3339());
273 }
274
275 Ok(())
276 }
277
278 fn parse_datetime(s: &str) -> Result<(DateTime<Utc>, Option<DateTime<Utc>>)> {
280 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
281 Ok((dt.with_timezone(&Utc), None))
282 } else {
283 let formats = ["%Y-%m-%d", "%Y-%m", "%Y"];
284 for format in &formats {
285 if let Ok(naive_date) = NaiveDate::parse_from_str(s, format) {
286 let start = Utc.from_utc_datetime(
287 &naive_date.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
288 );
289 let end = match *format {
290 "%Y-%m-%d" => start + Duration::days(1),
291 "%Y-%m" => {
292 let next_month = if naive_date.month() == 12 {
293 NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap()
294 } else {
295 NaiveDate::from_ymd_opt(
296 naive_date.year(),
297 naive_date.month() + 1,
298 1,
299 )
300 .unwrap()
301 };
302 Utc.from_utc_datetime(&next_month.and_hms_opt(0, 0, 0).unwrap())
303 }
304 "%Y" => {
305 let next_year =
306 NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap();
307 Utc.from_utc_datetime(&next_year.and_hms_opt(0, 0, 0).unwrap())
308 }
309 _ => unreachable!(),
310 };
311 return Ok((start, Some(end)));
312 }
313 }
314 Err(InvalidDateFormatSnafu {
315 input: s.to_string(),
316 }
317 .build())
318 }
319 }
320
321 fn parse_span(s: &str) -> Result<Duration> {
323 if let Ok(seconds) = s.parse::<i64>() {
325 Ok(Duration::seconds(seconds))
326 } else {
327 Err(InvalidSpanFormatSnafu {
328 input: s.to_string(),
329 }
330 .build())
331 }
332 }
333}
334
335#[derive(Debug, Clone, Serialize, Deserialize)]
337pub struct ColumnFilters {
338 pub expr: Box<LogExpr>,
340 pub filters: Vec<ContentFilter>,
342}
343
344#[derive(Clone, Debug, Serialize, Deserialize)]
345pub enum EqualValue {
346 String(String),
348 Boolean(bool),
350 Int(i64),
352 UInt(u64),
354 Float(f64),
356}
357
358impl From<String> for EqualValue {
359 fn from(value: String) -> Self {
360 EqualValue::String(value)
361 }
362}
363
364impl From<bool> for EqualValue {
365 fn from(value: bool) -> Self {
366 EqualValue::Boolean(value)
367 }
368}
369
370impl From<i64> for EqualValue {
371 fn from(value: i64) -> Self {
372 EqualValue::Int(value)
373 }
374}
375
376impl From<f64> for EqualValue {
377 fn from(value: f64) -> Self {
378 EqualValue::Float(value)
379 }
380}
381
382impl From<u64> for EqualValue {
383 fn from(value: u64) -> Self {
384 EqualValue::UInt(value)
385 }
386}
387
388#[derive(Clone, Debug, Serialize, Deserialize)]
389pub enum ContentFilter {
390 Exact(String),
395 Prefix(String),
399 Postfix(String),
401 Contains(String),
403 Regex(String),
405
406 Exist,
409 Between {
410 start: String,
411 end: String,
412 start_inclusive: bool,
413 end_inclusive: bool,
414 },
415 GreatThan {
416 value: String,
417 inclusive: bool,
418 },
419 LessThan {
420 value: String,
421 inclusive: bool,
422 },
423 In(Vec<String>),
424 IsTrue,
425 IsFalse,
426 Equal(EqualValue),
427
428 Compound(Vec<ContentFilter>, ConjunctionOperator),
430}
431
432#[derive(Clone, Debug, Serialize, Deserialize)]
433pub enum ConjunctionOperator {
434 And,
435 Or,
436}
437
438#[derive(Clone, Debug, Serialize, Deserialize)]
440pub enum BinaryOperator {
441 Eq,
443 Ne,
444 Lt,
445 Le,
446 Gt,
447 Ge,
448
449 Plus,
451 Minus,
452 Multiply,
453 Divide,
454 Modulo,
455
456 And,
458 Or,
459}
460
461#[derive(Debug, Default, Serialize, Deserialize)]
463pub enum Context {
464 #[default]
465 None,
466 Lines(usize, usize),
468 Seconds(usize, usize),
470}
471
472#[derive(Debug, Default, Serialize, Deserialize)]
474pub struct Limit {
475 pub skip: Option<usize>,
477 pub fetch: Option<usize>,
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use crate::error::Error;
485
486 #[test]
487 fn test_canonicalize() {
488 let mut tf = TimeFilter {
490 start: Some("2023-10-01".to_string()),
491 end: None,
492 span: None,
493 };
494 tf.canonicalize().unwrap();
495 assert!(tf.end.is_some());
496
497 let mut tf = TimeFilter {
499 start: Some("2023-10-01T00:00:00Z".to_string()),
500 end: None,
501 span: Some("86400".to_string()), };
503 tf.canonicalize().unwrap();
504 assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
505
506 let mut tf = TimeFilter {
508 start: None,
509 end: Some("2023-10-02T00:00:00Z".to_string()),
510 span: Some("86400".to_string()), };
512 tf.canonicalize().unwrap();
513 assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
514
515 let mut tf = TimeFilter {
517 start: Some("2023-10-01T00:00:00Z".to_string()),
518 end: Some("2023-10-02T00:00:00Z".to_string()),
519 span: None,
520 };
521 tf.canonicalize().unwrap();
522 assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
523 assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
524
525 let mut tf = TimeFilter {
527 start: Some("invalid-date".to_string()),
528 end: None,
529 span: None,
530 };
531 let result = tf.canonicalize();
532 assert!(matches!(result, Err(Error::InvalidDateFormat { .. })));
533
534 let mut tf = TimeFilter {
536 start: None,
537 end: None,
538 span: None,
539 };
540 let result = tf.canonicalize();
541 assert!(matches!(result, Err(Error::InvalidTimeFilter { .. })));
542
543 let mut tf = TimeFilter {
545 start: Some("2023-10-02T00:00:00Z".to_string()),
546 end: Some("2023-10-01T00:00:00Z".to_string()),
547 span: None,
548 };
549 let result = tf.canonicalize();
550 assert!(matches!(result, Err(Error::EndBeforeStart { .. })));
551 }
552}