log_query/
log_query.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
16use serde::{Deserialize, Serialize};
17use table::table_name::TableName;
18
19use crate::error::{
20    EndBeforeStartSnafu, InvalidDateFormatSnafu, InvalidSpanFormatSnafu, InvalidTimeFilterSnafu,
21    Result,
22};
23
24/// GreptimeDB's log query request.
25#[derive(Debug, Serialize, Deserialize)]
26pub struct LogQuery {
27    // Global query parameters
28    /// A fully qualified table name to query logs from.
29    pub table: TableName,
30    /// Specifies the time range for the log query. See [`TimeFilter`] for more details.
31    pub time_filter: TimeFilter,
32    /// Controls row skipping and fetch on the result set.
33    pub limit: Limit,
34    /// Columns to return in the result set.
35    ///
36    /// The columns can be either from the original log or derived from processing exprs.
37    /// Default (empty) means all columns.
38    ///
39    /// TODO(ruihang): Do we need negative select?
40    pub columns: Vec<String>,
41
42    // Filters
43    /// Conjunction of filters to apply for the raw logs.
44    ///
45    /// Filters here can apply to any LogExpr.
46    pub filters: Filters,
47    /// Adjacent lines to return. Applies to all filters above.
48    ///
49    /// TODO(ruihang): Do we need per-filter context?
50    pub context: Context,
51
52    // Processors
53    /// Expressions to calculate after filter.
54    pub exprs: Vec<LogExpr>,
55}
56
57/// Nested filter structure that supports and/or relationships
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum Filters {
60    /// Single filter condition
61    Single(ColumnFilters),
62    /// Multiple filters with AND relationship
63    And(Vec<Filters>),
64    /// Multiple filters with OR relationship
65    Or(Vec<Filters>),
66    Not(Box<Filters>),
67}
68
69impl Default for Filters {
70    fn default() -> Self {
71        Filters::And(vec![])
72    }
73}
74
75impl From<ColumnFilters> for Filters {
76    fn from(filter: ColumnFilters) -> Self {
77        Filters::Single(filter)
78    }
79}
80
81impl Filters {
82    pub fn and<T: Into<Filters>>(other: Vec<T>) -> Filters {
83        Filters::And(other.into_iter().map(Into::into).collect())
84    }
85
86    pub fn or<T: Into<Filters>>(other: Vec<T>) -> Filters {
87        Filters::Or(other.into_iter().map(Into::into).collect())
88    }
89
90    pub fn single(filter: ColumnFilters) -> Filters {
91        Filters::Single(filter)
92    }
93}
94
95/// Expression to calculate on log after filtering.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum LogExpr {
98    NamedIdent(String),
99    PositionalIdent(usize),
100    Literal(String),
101    ScalarFunc {
102        name: String,
103        args: Vec<LogExpr>,
104        alias: Option<String>,
105    },
106    AggrFunc {
107        name: String,
108        args: Vec<LogExpr>,
109        /// Optional range function parameter. Stands for the time range for both step and align.
110        range: Option<String>,
111        by: Vec<LogExpr>,
112        alias: Option<String>,
113    },
114    Decompose {
115        expr: Box<LogExpr>,
116        /// JSON, CSV, etc.
117        schema: String,
118        /// Fields with type name to extract from the decomposed value.
119        fields: Vec<(String, String)>,
120    },
121    BinaryOp {
122        left: Box<LogExpr>,
123        op: BinaryOperator,
124        right: Box<LogExpr>,
125    },
126    Alias {
127        expr: Box<LogExpr>,
128        alias: String,
129    },
130    Filter {
131        filter: ColumnFilters,
132    },
133}
134
135impl Default for LogQuery {
136    fn default() -> Self {
137        Self {
138            table: TableName::new("", "", ""),
139            time_filter: Default::default(),
140            filters: Filters::And(vec![]),
141            limit: Limit::default(),
142            context: Default::default(),
143            columns: vec![],
144            exprs: vec![],
145        }
146    }
147}
148
149/// Represents a time range for log query.
150///
151/// This struct allows various formats to express a time range from the user side
152/// for best flexibility:
153/// - Only `start` is provided: the `start` string can be any valid "date" or vaguer
154///     content. For example: "2024-12-01", "2024-12", "2024", etc. It will be treated
155///     as an time range corresponding to the provided date. E.g., "2024-12-01" refers
156///     to the entire 24 hours in that day. In this case, the `start` field cannot be a
157///     timestamp (like "2024-12-01T12:00:00Z").
158/// - Both `start` and `end` are provided: the `start` and `end` strings can be either
159///     a date or a timestamp. The `end` field is exclusive (`[start, end)`). When
160///     `start` is a date it implies the start of the day, and when `end` is a date it
161///     implies the end of the day.
162/// - `span` with `start` OR `end`: the `span` string can be any valid "interval"
163///     For example: "1024s", "1 week", "1 month", etc. The `span` field is applied to
164///     the `start` or `end` field to calculate the other one correspondingly. If `start`
165///     is provided, `end` is calculated as `start + span` and vice versa.
166/// - Only `span` is provided: the `span` string can be any valid "interval" as mentioned
167///     above. In this case, the current time (on the server side) is considered as the `end`.
168/// - All fields are provided: in this case, the `start` and `end` fields are considered
169///     with higher priority, and the `span` field is ignored.
170///
171/// This struct doesn't require a timezone to be presented. When the timezone is not
172/// provided, it will fill the default timezone with the same rules akin to other queries.
173#[derive(Debug, Clone, Default, Serialize, Deserialize)]
174pub struct TimeFilter {
175    pub start: Option<String>,
176    pub end: Option<String>,
177    pub span: Option<String>,
178}
179
180impl TimeFilter {
181    /// Validate and canonicalize the time filter.
182    ///
183    /// This function will try to fill the missing fields and convert all dates to timestamps
184    #[allow(unused_assignments)] // false positive
185    pub fn canonicalize(&mut self) -> Result<()> {
186        let mut start_dt = None;
187        let mut end_dt = None;
188
189        if self.start.is_some() && self.end.is_none() && self.span.is_none() {
190            // Only 'start' is provided
191            let s = self.start.as_ref().unwrap();
192            let (start, end_opt) = Self::parse_datetime(s)?;
193            if end_opt.is_none() {
194                return Err(InvalidTimeFilterSnafu {
195                    filter: self.clone(),
196                }
197                .build());
198            }
199            start_dt = Some(start);
200            end_dt = end_opt;
201        } else if self.start.is_some() && self.end.is_some() {
202            // Both 'start' and 'end' are provided
203            let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
204            let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
205            start_dt = Some(start);
206            end_dt = Some(end);
207        } else if self.span.is_some() && (self.start.is_some() || self.end.is_some()) {
208            // 'span' with 'start' or 'end'
209            let span = Self::parse_span(self.span.as_ref().unwrap())?;
210            if self.start.is_some() {
211                let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
212                let end = start + span;
213                start_dt = Some(start);
214                end_dt = Some(end);
215            } else {
216                let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
217                let start = end - span;
218                start_dt = Some(start);
219                end_dt = Some(end);
220            }
221        } else if self.span.is_some() && self.start.is_none() && self.end.is_none() {
222            // Only 'span' is provided
223            let span = Self::parse_span(self.span.as_ref().unwrap())?;
224            let end = Utc::now();
225            let start = end - span;
226            start_dt = Some(start);
227            end_dt = Some(end);
228        } else if self.start.is_some() && self.span.is_some() && self.end.is_some() {
229            // All fields are provided; 'start' and 'end' take priority
230            let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
231            let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
232            start_dt = Some(start);
233            end_dt = Some(end);
234        } else {
235            // Exception
236            return Err(InvalidTimeFilterSnafu {
237                filter: self.clone(),
238            }
239            .build());
240        }
241
242        // Validate that end is after start
243        if let (Some(start), Some(end)) = (&start_dt, &end_dt)
244            && end <= start
245        {
246            return Err(EndBeforeStartSnafu {
247                start: start.to_rfc3339(),
248                end: end.to_rfc3339(),
249            }
250            .build());
251        }
252
253        // Update the fields with canonicalized timestamps
254        if let Some(start) = start_dt {
255            self.start = Some(start.to_rfc3339());
256        }
257
258        if let Some(end) = end_dt {
259            self.end = Some(end.to_rfc3339());
260        }
261
262        Ok(())
263    }
264
265    /// Util function returns a start and optional end DateTime
266    fn parse_datetime(s: &str) -> Result<(DateTime<Utc>, Option<DateTime<Utc>>)> {
267        if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
268            Ok((dt.with_timezone(&Utc), None))
269        } else {
270            let formats = ["%Y-%m-%d", "%Y-%m", "%Y"];
271            for format in &formats {
272                if let Ok(naive_date) = NaiveDate::parse_from_str(s, format) {
273                    let start = Utc.from_utc_datetime(
274                        &naive_date.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
275                    );
276                    let end = match *format {
277                        "%Y-%m-%d" => start + Duration::days(1),
278                        "%Y-%m" => {
279                            let next_month = if naive_date.month() == 12 {
280                                NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap()
281                            } else {
282                                NaiveDate::from_ymd_opt(
283                                    naive_date.year(),
284                                    naive_date.month() + 1,
285                                    1,
286                                )
287                                .unwrap()
288                            };
289                            Utc.from_utc_datetime(&next_month.and_hms_opt(0, 0, 0).unwrap())
290                        }
291                        "%Y" => {
292                            let next_year =
293                                NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap();
294                            Utc.from_utc_datetime(&next_year.and_hms_opt(0, 0, 0).unwrap())
295                        }
296                        _ => unreachable!(),
297                    };
298                    return Ok((start, Some(end)));
299                }
300            }
301            Err(InvalidDateFormatSnafu {
302                input: s.to_string(),
303            }
304            .build())
305        }
306    }
307
308    /// Util function handles durations like "1 week", "1 month", etc (unimplemented).
309    fn parse_span(s: &str) -> Result<Duration> {
310        // Simplified parsing logic
311        if let Ok(seconds) = s.parse::<i64>() {
312            Ok(Duration::seconds(seconds))
313        } else {
314            Err(InvalidSpanFormatSnafu {
315                input: s.to_string(),
316            }
317            .build())
318        }
319    }
320}
321
322/// Represents an expression with filters to query.
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct ColumnFilters {
325    /// Expression to apply filters to. Can be a column reference or any other LogExpr.
326    pub expr: Box<LogExpr>,
327    /// Filters to apply to the expression result. Can be empty.
328    pub filters: Vec<ContentFilter>,
329}
330
331#[derive(Clone, Debug, Serialize, Deserialize)]
332pub enum EqualValue {
333    /// Exact match with a string value.
334    String(String),
335    /// Exact match with a boolean value.
336    Boolean(bool),
337    /// Exact match with a number value.
338    Int(i64),
339    /// Exact match with an unsigned integer value.
340    UInt(u64),
341    /// Exact match with a float value.
342    Float(f64),
343}
344
345impl From<String> for EqualValue {
346    fn from(value: String) -> Self {
347        EqualValue::String(value)
348    }
349}
350
351impl From<bool> for EqualValue {
352    fn from(value: bool) -> Self {
353        EqualValue::Boolean(value)
354    }
355}
356
357impl From<i64> for EqualValue {
358    fn from(value: i64) -> Self {
359        EqualValue::Int(value)
360    }
361}
362
363impl From<f64> for EqualValue {
364    fn from(value: f64) -> Self {
365        EqualValue::Float(value)
366    }
367}
368
369impl From<u64> for EqualValue {
370    fn from(value: u64) -> Self {
371        EqualValue::UInt(value)
372    }
373}
374
375#[derive(Clone, Debug, Serialize, Deserialize)]
376pub enum ContentFilter {
377    // Search-based filters
378    /// Only match the exact content.
379    ///
380    /// For example, if the content is "pale blue dot", the filter "pale" or "pale blue" will match.
381    Exact(String),
382    /// Match the content with a prefix.
383    ///
384    /// For example, if the content is "error message", the filter "err" or "error mess" will match.
385    Prefix(String),
386    /// Match the content with a postfix. Similar to `Prefix`.
387    Postfix(String),
388    /// Match the content with a substring.
389    Contains(String),
390    /// Match the content with a regex pattern. The pattern should be a valid Rust regex.
391    Regex(String),
392
393    // Value-based filters
394    /// Content exists, a.k.a. not null.
395    Exist,
396    Between {
397        start: String,
398        end: String,
399        start_inclusive: bool,
400        end_inclusive: bool,
401    },
402    GreatThan {
403        value: String,
404        inclusive: bool,
405    },
406    LessThan {
407        value: String,
408        inclusive: bool,
409    },
410    In(Vec<String>),
411    IsTrue,
412    IsFalse,
413    Equal(EqualValue),
414
415    // Compound filters
416    Compound(Vec<ContentFilter>, ConjunctionOperator),
417}
418
419#[derive(Clone, Debug, Serialize, Deserialize)]
420pub enum ConjunctionOperator {
421    And,
422    Or,
423}
424
425/// Binary operators for LogExpr::BinaryOp.
426#[derive(Clone, Debug, Serialize, Deserialize)]
427pub enum BinaryOperator {
428    // Comparison operators
429    Eq,
430    Ne,
431    Lt,
432    Le,
433    Gt,
434    Ge,
435
436    // Arithmetic operators
437    Plus,
438    Minus,
439    Multiply,
440    Divide,
441    Modulo,
442
443    // Logical operators
444    And,
445    Or,
446}
447
448/// Controls how many adjacent lines to return.
449#[derive(Debug, Default, Serialize, Deserialize)]
450pub enum Context {
451    #[default]
452    None,
453    /// Specify the number of lines before and after the matched line separately.
454    Lines(usize, usize),
455    /// Specify the number of seconds before and after the matched line occurred.
456    Seconds(usize, usize),
457}
458
459/// Represents limit and offset parameters for query pagination.
460#[derive(Debug, Default, Serialize, Deserialize)]
461pub struct Limit {
462    /// Optional number of items to skip before starting to return results
463    pub skip: Option<usize>,
464    /// Optional number of items to return after skipping
465    pub fetch: Option<usize>,
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use crate::error::Error;
472
473    #[test]
474    fn test_canonicalize() {
475        // with 'start' only
476        let mut tf = TimeFilter {
477            start: Some("2023-10-01".to_string()),
478            end: None,
479            span: None,
480        };
481        tf.canonicalize().unwrap();
482        assert!(tf.end.is_some());
483
484        // with 'start' and 'span'
485        let mut tf = TimeFilter {
486            start: Some("2023-10-01T00:00:00Z".to_string()),
487            end: None,
488            span: Some("86400".to_string()), // 1 day in seconds
489        };
490        tf.canonicalize().unwrap();
491        assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
492
493        // with 'end' and 'span'
494        let mut tf = TimeFilter {
495            start: None,
496            end: Some("2023-10-02T00:00:00Z".to_string()),
497            span: Some("86400".to_string()), // 1 day in seconds
498        };
499        tf.canonicalize().unwrap();
500        assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
501
502        // with both 'start' and 'end'
503        let mut tf = TimeFilter {
504            start: Some("2023-10-01T00:00:00Z".to_string()),
505            end: Some("2023-10-02T00:00:00Z".to_string()),
506            span: None,
507        };
508        tf.canonicalize().unwrap();
509        assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
510        assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
511
512        // with invalid date format
513        let mut tf = TimeFilter {
514            start: Some("invalid-date".to_string()),
515            end: None,
516            span: None,
517        };
518        let result = tf.canonicalize();
519        assert!(matches!(result, Err(Error::InvalidDateFormat { .. })));
520
521        // with missing 'start' and 'end'
522        let mut tf = TimeFilter {
523            start: None,
524            end: None,
525            span: None,
526        };
527        let result = tf.canonicalize();
528        assert!(matches!(result, Err(Error::InvalidTimeFilter { .. })));
529
530        // 'end' is before 'start'
531        let mut tf = TimeFilter {
532            start: Some("2023-10-02T00:00:00Z".to_string()),
533            end: Some("2023-10-01T00:00:00Z".to_string()),
534            span: None,
535        };
536        let result = tf.canonicalize();
537        assert!(matches!(result, Err(Error::EndBeforeStart { .. })));
538    }
539}