log_query/
log_query.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
16use serde::{Deserialize, Serialize};
17use table::table_name::TableName;
18
19use crate::error::{
20    EndBeforeStartSnafu, InvalidDateFormatSnafu, InvalidSpanFormatSnafu, InvalidTimeFilterSnafu,
21    Result,
22};
23
24/// GreptimeDB's log query request.
25#[derive(Debug, Serialize, Deserialize)]
26pub struct LogQuery {
27    // Global query parameters
28    /// A fully qualified table name to query logs from.
29    pub table: TableName,
30    /// Specifies the time range for the log query. See [`TimeFilter`] for more details.
31    pub time_filter: TimeFilter,
32    /// Controls row skipping and fetch on the result set.
33    pub limit: Limit,
34    /// Columns to return in the result set.
35    ///
36    /// The columns can be either from the original log or derived from processing exprs.
37    /// Default (empty) means all columns.
38    ///
39    /// TODO(ruihang): Do we need negative select?
40    pub columns: Vec<String>,
41
42    // Filters
43    /// Conjunction of filters to apply for the raw logs.
44    ///
45    /// Filters here can apply to any LogExpr.
46    pub filters: Filters,
47    /// Adjacent lines to return. Applies to all filters above.
48    ///
49    /// TODO(ruihang): Do we need per-filter context?
50    pub context: Context,
51
52    // Processors
53    /// Expressions to calculate after filter.
54    pub exprs: Vec<LogExpr>,
55}
56
57/// Nested filter structure that supports and/or relationships
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum Filters {
60    /// Single filter condition
61    Single(ColumnFilters),
62    /// Multiple filters with AND relationship
63    And(Vec<Filters>),
64    /// Multiple filters with OR relationship
65    Or(Vec<Filters>),
66    Not(Box<Filters>),
67}
68
69impl Default for Filters {
70    fn default() -> Self {
71        Filters::And(vec![])
72    }
73}
74
75impl From<ColumnFilters> for Filters {
76    fn from(filter: ColumnFilters) -> Self {
77        Filters::Single(filter)
78    }
79}
80
81impl Filters {
82    pub fn and<T: Into<Filters>>(other: Vec<T>) -> Filters {
83        Filters::And(other.into_iter().map(Into::into).collect())
84    }
85
86    pub fn or<T: Into<Filters>>(other: Vec<T>) -> Filters {
87        Filters::Or(other.into_iter().map(Into::into).collect())
88    }
89
90    pub fn single(filter: ColumnFilters) -> Filters {
91        Filters::Single(filter)
92    }
93}
94/// Aggregation function with optional range and alias.
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct AggFunc {
97    /// Function name, e.g., "count", "sum", etc.
98    pub name: String,
99    /// Arguments to the function. e.g., column references or literals. LogExpr::NamedIdent("column1".to_string())
100    pub args: Vec<LogExpr>,
101    pub alias: Option<String>,
102}
103
104impl AggFunc {
105    pub fn new(name: String, args: Vec<LogExpr>, alias: Option<String>) -> Self {
106        Self { name, args, alias }
107    }
108}
109
110/// Expression to calculate on log after filtering.
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LogExpr {
113    NamedIdent(String),
114    PositionalIdent(usize),
115    Literal(String),
116    ScalarFunc {
117        name: String,
118        args: Vec<LogExpr>,
119        alias: Option<String>,
120    },
121    /// Aggregation function with optional grouping.
122    AggrFunc {
123        /// Function name, arguments, and optional alias.
124        expr: Vec<AggFunc>,
125        by: Vec<LogExpr>,
126    },
127    Decompose {
128        expr: Box<LogExpr>,
129        /// JSON, CSV, etc.
130        schema: String,
131        /// Fields with type name to extract from the decomposed value.
132        fields: Vec<(String, String)>,
133    },
134    BinaryOp {
135        left: Box<LogExpr>,
136        op: BinaryOperator,
137        right: Box<LogExpr>,
138    },
139    Alias {
140        expr: Box<LogExpr>,
141        alias: String,
142    },
143    Filter {
144        filter: ColumnFilters,
145    },
146}
147
148impl Default for LogQuery {
149    fn default() -> Self {
150        Self {
151            table: TableName::new("", "", ""),
152            time_filter: Default::default(),
153            filters: Filters::And(vec![]),
154            limit: Limit::default(),
155            context: Default::default(),
156            columns: vec![],
157            exprs: vec![],
158        }
159    }
160}
161
162/// Represents a time range for log query.
163///
164/// This struct allows various formats to express a time range from the user side
165/// for best flexibility:
166/// - Only `start` is provided: the `start` string can be any valid "date" or vaguer
167///     content. For example: "2024-12-01", "2024-12", "2024", etc. It will be treated
168///     as an time range corresponding to the provided date. E.g., "2024-12-01" refers
169///     to the entire 24 hours in that day. In this case, the `start` field cannot be a
170///     timestamp (like "2024-12-01T12:00:00Z").
171/// - Both `start` and `end` are provided: the `start` and `end` strings can be either
172///     a date or a timestamp. The `end` field is exclusive (`[start, end)`). When
173///     `start` is a date it implies the start of the day, and when `end` is a date it
174///     implies the end of the day.
175/// - `span` with `start` OR `end`: the `span` string can be any valid "interval"
176///     For example: "1024s", "1 week", "1 month", etc. The `span` field is applied to
177///     the `start` or `end` field to calculate the other one correspondingly. If `start`
178///     is provided, `end` is calculated as `start + span` and vice versa.
179/// - Only `span` is provided: the `span` string can be any valid "interval" as mentioned
180///     above. In this case, the current time (on the server side) is considered as the `end`.
181/// - All fields are provided: in this case, the `start` and `end` fields are considered
182///     with higher priority, and the `span` field is ignored.
183///
184/// This struct doesn't require a timezone to be presented. When the timezone is not
185/// provided, it will fill the default timezone with the same rules akin to other queries.
186#[derive(Debug, Clone, Default, Serialize, Deserialize)]
187pub struct TimeFilter {
188    pub start: Option<String>,
189    pub end: Option<String>,
190    pub span: Option<String>,
191}
192
193impl TimeFilter {
194    /// Validate and canonicalize the time filter.
195    ///
196    /// This function will try to fill the missing fields and convert all dates to timestamps
197    #[allow(unused_assignments)] // false positive
198    pub fn canonicalize(&mut self) -> Result<()> {
199        let mut start_dt = None;
200        let mut end_dt = None;
201
202        if self.start.is_some() && self.end.is_none() && self.span.is_none() {
203            // Only 'start' is provided
204            let s = self.start.as_ref().unwrap();
205            let (start, end_opt) = Self::parse_datetime(s)?;
206            if end_opt.is_none() {
207                return Err(InvalidTimeFilterSnafu {
208                    filter: self.clone(),
209                }
210                .build());
211            }
212            start_dt = Some(start);
213            end_dt = end_opt;
214        } else if self.start.is_some() && self.end.is_some() {
215            // Both 'start' and 'end' are provided
216            let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
217            let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
218            start_dt = Some(start);
219            end_dt = Some(end);
220        } else if self.span.is_some() && (self.start.is_some() || self.end.is_some()) {
221            // 'span' with 'start' or 'end'
222            let span = Self::parse_span(self.span.as_ref().unwrap())?;
223            if self.start.is_some() {
224                let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
225                let end = start + span;
226                start_dt = Some(start);
227                end_dt = Some(end);
228            } else {
229                let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
230                let start = end - span;
231                start_dt = Some(start);
232                end_dt = Some(end);
233            }
234        } else if self.span.is_some() && self.start.is_none() && self.end.is_none() {
235            // Only 'span' is provided
236            let span = Self::parse_span(self.span.as_ref().unwrap())?;
237            let end = Utc::now();
238            let start = end - span;
239            start_dt = Some(start);
240            end_dt = Some(end);
241        } else if self.start.is_some() && self.span.is_some() && self.end.is_some() {
242            // All fields are provided; 'start' and 'end' take priority
243            let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
244            let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
245            start_dt = Some(start);
246            end_dt = Some(end);
247        } else {
248            // Exception
249            return Err(InvalidTimeFilterSnafu {
250                filter: self.clone(),
251            }
252            .build());
253        }
254
255        // Validate that end is after start
256        if let (Some(start), Some(end)) = (&start_dt, &end_dt)
257            && end <= start
258        {
259            return Err(EndBeforeStartSnafu {
260                start: start.to_rfc3339(),
261                end: end.to_rfc3339(),
262            }
263            .build());
264        }
265
266        // Update the fields with canonicalized timestamps
267        if let Some(start) = start_dt {
268            self.start = Some(start.to_rfc3339());
269        }
270
271        if let Some(end) = end_dt {
272            self.end = Some(end.to_rfc3339());
273        }
274
275        Ok(())
276    }
277
278    /// Util function returns a start and optional end DateTime
279    fn parse_datetime(s: &str) -> Result<(DateTime<Utc>, Option<DateTime<Utc>>)> {
280        if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
281            Ok((dt.with_timezone(&Utc), None))
282        } else {
283            let formats = ["%Y-%m-%d", "%Y-%m", "%Y"];
284            for format in &formats {
285                if let Ok(naive_date) = NaiveDate::parse_from_str(s, format) {
286                    let start = Utc.from_utc_datetime(
287                        &naive_date.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
288                    );
289                    let end = match *format {
290                        "%Y-%m-%d" => start + Duration::days(1),
291                        "%Y-%m" => {
292                            let next_month = if naive_date.month() == 12 {
293                                NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap()
294                            } else {
295                                NaiveDate::from_ymd_opt(
296                                    naive_date.year(),
297                                    naive_date.month() + 1,
298                                    1,
299                                )
300                                .unwrap()
301                            };
302                            Utc.from_utc_datetime(&next_month.and_hms_opt(0, 0, 0).unwrap())
303                        }
304                        "%Y" => {
305                            let next_year =
306                                NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap();
307                            Utc.from_utc_datetime(&next_year.and_hms_opt(0, 0, 0).unwrap())
308                        }
309                        _ => unreachable!(),
310                    };
311                    return Ok((start, Some(end)));
312                }
313            }
314            Err(InvalidDateFormatSnafu {
315                input: s.to_string(),
316            }
317            .build())
318        }
319    }
320
321    /// Util function handles durations like "1 week", "1 month", etc (unimplemented).
322    fn parse_span(s: &str) -> Result<Duration> {
323        // Simplified parsing logic
324        if let Ok(seconds) = s.parse::<i64>() {
325            Ok(Duration::seconds(seconds))
326        } else {
327            Err(InvalidSpanFormatSnafu {
328                input: s.to_string(),
329            }
330            .build())
331        }
332    }
333}
334
335/// Represents an expression with filters to query.
336#[derive(Debug, Clone, Serialize, Deserialize)]
337pub struct ColumnFilters {
338    /// Expression to apply filters to. Can be a column reference or any other LogExpr.
339    pub expr: Box<LogExpr>,
340    /// Filters to apply to the expression result. Can be empty.
341    pub filters: Vec<ContentFilter>,
342}
343
344#[derive(Clone, Debug, Serialize, Deserialize)]
345pub enum EqualValue {
346    /// Exact match with a string value.
347    String(String),
348    /// Exact match with a boolean value.
349    Boolean(bool),
350    /// Exact match with a number value.
351    Int(i64),
352    /// Exact match with an unsigned integer value.
353    UInt(u64),
354    /// Exact match with a float value.
355    Float(f64),
356}
357
358impl From<String> for EqualValue {
359    fn from(value: String) -> Self {
360        EqualValue::String(value)
361    }
362}
363
364impl From<bool> for EqualValue {
365    fn from(value: bool) -> Self {
366        EqualValue::Boolean(value)
367    }
368}
369
370impl From<i64> for EqualValue {
371    fn from(value: i64) -> Self {
372        EqualValue::Int(value)
373    }
374}
375
376impl From<f64> for EqualValue {
377    fn from(value: f64) -> Self {
378        EqualValue::Float(value)
379    }
380}
381
382impl From<u64> for EqualValue {
383    fn from(value: u64) -> Self {
384        EqualValue::UInt(value)
385    }
386}
387
388#[derive(Clone, Debug, Serialize, Deserialize)]
389pub enum ContentFilter {
390    // Search-based filters
391    /// Only match the exact content.
392    ///
393    /// For example, if the content is "pale blue dot", the filter "pale" or "pale blue" will match.
394    Exact(String),
395    /// Match the content with a prefix.
396    ///
397    /// For example, if the content is "error message", the filter "err" or "error mess" will match.
398    Prefix(String),
399    /// Match the content with a postfix. Similar to `Prefix`.
400    Postfix(String),
401    /// Match the content with a substring.
402    Contains(String),
403    /// Match the content with a regex pattern. The pattern should be a valid Rust regex.
404    Regex(String),
405
406    // Value-based filters
407    /// Content exists, a.k.a. not null.
408    Exist,
409    Between {
410        start: String,
411        end: String,
412        start_inclusive: bool,
413        end_inclusive: bool,
414    },
415    GreatThan {
416        value: String,
417        inclusive: bool,
418    },
419    LessThan {
420        value: String,
421        inclusive: bool,
422    },
423    In(Vec<String>),
424    IsTrue,
425    IsFalse,
426    Equal(EqualValue),
427
428    // Compound filters
429    Compound(Vec<ContentFilter>, ConjunctionOperator),
430}
431
432#[derive(Clone, Debug, Serialize, Deserialize)]
433pub enum ConjunctionOperator {
434    And,
435    Or,
436}
437
438/// Binary operators for LogExpr::BinaryOp.
439#[derive(Clone, Debug, Serialize, Deserialize)]
440pub enum BinaryOperator {
441    // Comparison operators
442    Eq,
443    Ne,
444    Lt,
445    Le,
446    Gt,
447    Ge,
448
449    // Arithmetic operators
450    Plus,
451    Minus,
452    Multiply,
453    Divide,
454    Modulo,
455
456    // Logical operators
457    And,
458    Or,
459}
460
461/// Controls how many adjacent lines to return.
462#[derive(Debug, Default, Serialize, Deserialize)]
463pub enum Context {
464    #[default]
465    None,
466    /// Specify the number of lines before and after the matched line separately.
467    Lines(usize, usize),
468    /// Specify the number of seconds before and after the matched line occurred.
469    Seconds(usize, usize),
470}
471
472/// Represents limit and offset parameters for query pagination.
473#[derive(Debug, Default, Serialize, Deserialize)]
474pub struct Limit {
475    /// Optional number of items to skip before starting to return results
476    pub skip: Option<usize>,
477    /// Optional number of items to return after skipping
478    pub fetch: Option<usize>,
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use crate::error::Error;
485
486    #[test]
487    fn test_canonicalize() {
488        // with 'start' only
489        let mut tf = TimeFilter {
490            start: Some("2023-10-01".to_string()),
491            end: None,
492            span: None,
493        };
494        tf.canonicalize().unwrap();
495        assert!(tf.end.is_some());
496
497        // with 'start' and 'span'
498        let mut tf = TimeFilter {
499            start: Some("2023-10-01T00:00:00Z".to_string()),
500            end: None,
501            span: Some("86400".to_string()), // 1 day in seconds
502        };
503        tf.canonicalize().unwrap();
504        assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
505
506        // with 'end' and 'span'
507        let mut tf = TimeFilter {
508            start: None,
509            end: Some("2023-10-02T00:00:00Z".to_string()),
510            span: Some("86400".to_string()), // 1 day in seconds
511        };
512        tf.canonicalize().unwrap();
513        assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
514
515        // with both 'start' and 'end'
516        let mut tf = TimeFilter {
517            start: Some("2023-10-01T00:00:00Z".to_string()),
518            end: Some("2023-10-02T00:00:00Z".to_string()),
519            span: None,
520        };
521        tf.canonicalize().unwrap();
522        assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
523        assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
524
525        // with invalid date format
526        let mut tf = TimeFilter {
527            start: Some("invalid-date".to_string()),
528            end: None,
529            span: None,
530        };
531        let result = tf.canonicalize();
532        assert!(matches!(result, Err(Error::InvalidDateFormat { .. })));
533
534        // with missing 'start' and 'end'
535        let mut tf = TimeFilter {
536            start: None,
537            end: None,
538            span: None,
539        };
540        let result = tf.canonicalize();
541        assert!(matches!(result, Err(Error::InvalidTimeFilter { .. })));
542
543        // 'end' is before 'start'
544        let mut tf = TimeFilter {
545            start: Some("2023-10-02T00:00:00Z".to_string()),
546            end: Some("2023-10-01T00:00:00Z".to_string()),
547            span: None,
548        };
549        let result = tf.canonicalize();
550        assert!(matches!(result, Err(Error::EndBeforeStart { .. })));
551    }
552}