Skip to main content

log_query/
log_query.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
16use serde::{Deserialize, Serialize};
17use table::table_name::TableName;
18
19use crate::error::{
20    EndBeforeStartSnafu, InvalidDateFormatSnafu, InvalidSpanFormatSnafu, InvalidTimeFilterSnafu,
21    Result,
22};
23
24/// GreptimeDB's log query request.
25#[derive(Debug, Serialize, Deserialize)]
26pub struct LogQuery {
27    // Global query parameters
28    /// A fully qualified table name to query logs from.
29    pub table: TableName,
30    /// Specifies the time range for the log query. See [`TimeFilter`] for more details.
31    pub time_filter: TimeFilter,
32    /// Controls row skipping and fetch on the result set.
33    pub limit: Limit,
34    /// Columns to return in the result set.
35    ///
36    /// The columns can be either from the original log or derived from processing exprs.
37    /// Default (empty) means all columns.
38    ///
39    /// TODO(ruihang): Do we need negative select?
40    pub columns: Vec<String>,
41
42    // Filters
43    /// Conjunction of filters to apply for the raw logs.
44    ///
45    /// Filters here can apply to any LogExpr.
46    pub filters: Filters,
47    /// Adjacent lines to return. Applies to all filters above.
48    ///
49    /// TODO(ruihang): Do we need per-filter context?
50    pub context: Context,
51
52    // Processors
53    /// Expressions to calculate after filter.
54    pub exprs: Vec<LogExpr>,
55}
56
57/// Nested filter structure that supports and/or relationships
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum Filters {
60    /// Single filter condition
61    Single(ColumnFilters),
62    /// Multiple filters with AND relationship
63    And(Vec<Filters>),
64    /// Multiple filters with OR relationship
65    Or(Vec<Filters>),
66    Not(Box<Filters>),
67}
68
69impl Default for Filters {
70    fn default() -> Self {
71        Filters::And(vec![])
72    }
73}
74
75impl From<ColumnFilters> for Filters {
76    fn from(filter: ColumnFilters) -> Self {
77        Filters::Single(filter)
78    }
79}
80
81impl Filters {
82    pub fn and<T: Into<Filters>>(other: Vec<T>) -> Filters {
83        Filters::And(other.into_iter().map(Into::into).collect())
84    }
85
86    pub fn or<T: Into<Filters>>(other: Vec<T>) -> Filters {
87        Filters::Or(other.into_iter().map(Into::into).collect())
88    }
89
90    pub fn single(filter: ColumnFilters) -> Filters {
91        Filters::Single(filter)
92    }
93}
94/// Aggregation function with optional range and alias.
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct AggFunc {
97    /// Function name, e.g., "count", "sum", etc.
98    pub name: String,
99    /// Arguments to the function. e.g., column references or literals. LogExpr::NamedIdent("column1".to_string())
100    pub args: Vec<LogExpr>,
101    pub alias: Option<String>,
102}
103
104impl AggFunc {
105    pub fn new(name: String, args: Vec<LogExpr>, alias: Option<String>) -> Self {
106        Self { name, args, alias }
107    }
108}
109
110/// Expression to calculate on log after filtering.
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum LogExpr {
113    NamedIdent(String),
114    PositionalIdent(usize),
115    Literal(String),
116    ScalarFunc {
117        name: String,
118        args: Vec<LogExpr>,
119        alias: Option<String>,
120    },
121    /// Aggregation function with optional grouping.
122    AggrFunc {
123        /// Function name, arguments, and optional alias.
124        expr: Vec<AggFunc>,
125        by: Vec<LogExpr>,
126    },
127    Decompose {
128        expr: Box<LogExpr>,
129        /// JSON, CSV, etc.
130        schema: String,
131        /// Fields with type name to extract from the decomposed value.
132        fields: Vec<(String, String)>,
133    },
134    BinaryOp {
135        left: Box<LogExpr>,
136        op: BinaryOperator,
137        right: Box<LogExpr>,
138    },
139    Alias {
140        expr: Box<LogExpr>,
141        alias: String,
142    },
143    Filter {
144        filter: ColumnFilters,
145    },
146}
147
148impl Default for LogQuery {
149    fn default() -> Self {
150        Self {
151            table: TableName::new("", "", ""),
152            time_filter: Default::default(),
153            filters: Filters::And(vec![]),
154            limit: Limit::default(),
155            context: Default::default(),
156            columns: vec![],
157            exprs: vec![],
158        }
159    }
160}
161
162/// Represents a time range for log query.
163///
164/// This struct allows various formats to express a time range from the user side
165/// for best flexibility:
166/// - Only `start` is provided: the `start` string can be any valid "date" or vaguer
167///     content. For example: "2024-12-01", "2024-12", "2024", etc. It will be treated
168///     as an time range corresponding to the provided date. E.g., "2024-12-01" refers
169///     to the entire 24 hours in that day. In this case, the `start` field cannot be a
170///     timestamp (like "2024-12-01T12:00:00Z").
171/// - Both `start` and `end` are provided: the `start` and `end` strings can be either
172///     a date or a timestamp. The `end` field is exclusive (`[start, end)`). When
173///     `start` is a date it implies the start of the day, and when `end` is a date it
174///     implies the end of the day.
175/// - `span` with `start` OR `end`: the `span` string can be any valid "interval"
176///     For example: "1024s", "1 week", "1 month", etc. The `span` field is applied to
177///     the `start` or `end` field to calculate the other one correspondingly. If `start`
178///     is provided, `end` is calculated as `start + span` and vice versa.
179/// - Only `span` is provided: the `span` string can be any valid "interval" as mentioned
180///     above. In this case, the current time (on the server side) is considered as the `end`.
181/// - All fields are provided: in this case, the `start` and `end` fields are considered
182///     with higher priority, and the `span` field is ignored.
183///
184/// This struct doesn't require a timezone to be presented. When the timezone is not
185/// provided, it will fill the default timezone with the same rules akin to other queries.
186#[derive(Debug, Clone, Default, Serialize, Deserialize)]
187pub struct TimeFilter {
188    pub start: Option<String>,
189    pub end: Option<String>,
190    pub span: Option<String>,
191}
192
193impl TimeFilter {
194    /// Validate and canonicalize the time filter.
195    ///
196    /// This function will try to fill the missing fields and convert all dates to timestamps
197    #[allow(unused_assignments)] // false positive
198    pub fn canonicalize(&mut self) -> Result<()> {
199        let mut start_dt = None;
200        let mut end_dt = None;
201
202        match (&self.start, &self.end, &self.span) {
203            (Some(start), None, None) => {
204                let (start, end_opt) = Self::parse_datetime(start)?;
205                if end_opt.is_none() {
206                    return Err(InvalidTimeFilterSnafu {
207                        filter: self.clone(),
208                    }
209                    .build());
210                }
211                start_dt = Some(start);
212                end_dt = end_opt;
213            }
214            (Some(start), Some(end), _) => {
215                // Both 'start' and 'end' are provided
216                let (start, _) = Self::parse_datetime(start)?;
217                let (end, _) = Self::parse_datetime(end)?;
218                start_dt = Some(start);
219                end_dt = Some(end);
220            }
221            (Some(start), None, Some(span)) => {
222                let (start, _) = Self::parse_datetime(start)?;
223                let span = Self::parse_span(span)?;
224                let end = start + span;
225                start_dt = Some(start);
226                end_dt = Some(end);
227            }
228            (None, Some(end), Some(span)) => {
229                let (end, _) = Self::parse_datetime(end)?;
230                let span = Self::parse_span(span)?;
231                let start = end - span;
232                start_dt = Some(start);
233                end_dt = Some(end);
234            }
235            (None, None, Some(span)) => {
236                let span = Self::parse_span(span)?;
237                let end = Utc::now();
238                let start = end - span;
239                start_dt = Some(start);
240                end_dt = Some(end);
241            }
242            _ => {
243                // Exception
244                return Err(InvalidTimeFilterSnafu {
245                    filter: self.clone(),
246                }
247                .build());
248            }
249        }
250
251        // Validate that end is after start
252        if let (Some(start), Some(end)) = (&start_dt, &end_dt)
253            && end <= start
254        {
255            return Err(EndBeforeStartSnafu {
256                start: start.to_rfc3339(),
257                end: end.to_rfc3339(),
258            }
259            .build());
260        }
261
262        // Update the fields with canonicalized timestamps
263        if let Some(start) = start_dt {
264            self.start = Some(start.to_rfc3339());
265        }
266
267        if let Some(end) = end_dt {
268            self.end = Some(end.to_rfc3339());
269        }
270
271        Ok(())
272    }
273
274    /// Util function returns a start and optional end DateTime
275    fn parse_datetime(s: &str) -> Result<(DateTime<Utc>, Option<DateTime<Utc>>)> {
276        if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
277            Ok((dt.with_timezone(&Utc), None))
278        } else {
279            let formats = ["%Y-%m-%d", "%Y-%m", "%Y"];
280            for format in &formats {
281                if let Ok(naive_date) = NaiveDate::parse_from_str(s, format) {
282                    let start = Utc.from_utc_datetime(
283                        &naive_date.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
284                    );
285                    let end = match *format {
286                        "%Y-%m-%d" => start + Duration::days(1),
287                        "%Y-%m" => {
288                            let next_month = if naive_date.month() == 12 {
289                                NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap()
290                            } else {
291                                NaiveDate::from_ymd_opt(
292                                    naive_date.year(),
293                                    naive_date.month() + 1,
294                                    1,
295                                )
296                                .unwrap()
297                            };
298                            Utc.from_utc_datetime(&next_month.and_hms_opt(0, 0, 0).unwrap())
299                        }
300                        "%Y" => {
301                            let next_year =
302                                NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap();
303                            Utc.from_utc_datetime(&next_year.and_hms_opt(0, 0, 0).unwrap())
304                        }
305                        _ => unreachable!(),
306                    };
307                    return Ok((start, Some(end)));
308                }
309            }
310            Err(InvalidDateFormatSnafu {
311                input: s.to_string(),
312            }
313            .build())
314        }
315    }
316
317    /// Util function handles durations like "1 week", "1 month", etc (unimplemented).
318    fn parse_span(s: &str) -> Result<Duration> {
319        // Simplified parsing logic
320        if let Ok(seconds) = s.parse::<i64>() {
321            Ok(Duration::seconds(seconds))
322        } else {
323            Err(InvalidSpanFormatSnafu {
324                input: s.to_string(),
325            }
326            .build())
327        }
328    }
329}
330
331/// Represents an expression with filters to query.
332#[derive(Debug, Clone, Serialize, Deserialize)]
333pub struct ColumnFilters {
334    /// Expression to apply filters to. Can be a column reference or any other LogExpr.
335    pub expr: Box<LogExpr>,
336    /// Filters to apply to the expression result. Can be empty.
337    pub filters: Vec<ContentFilter>,
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize)]
341pub enum EqualValue {
342    /// Exact match with a string value.
343    String(String),
344    /// Exact match with a boolean value.
345    Boolean(bool),
346    /// Exact match with a number value.
347    Int(i64),
348    /// Exact match with an unsigned integer value.
349    UInt(u64),
350    /// Exact match with a float value.
351    Float(f64),
352}
353
354impl From<String> for EqualValue {
355    fn from(value: String) -> Self {
356        EqualValue::String(value)
357    }
358}
359
360impl From<bool> for EqualValue {
361    fn from(value: bool) -> Self {
362        EqualValue::Boolean(value)
363    }
364}
365
366impl From<i64> for EqualValue {
367    fn from(value: i64) -> Self {
368        EqualValue::Int(value)
369    }
370}
371
372impl From<f64> for EqualValue {
373    fn from(value: f64) -> Self {
374        EqualValue::Float(value)
375    }
376}
377
378impl From<u64> for EqualValue {
379    fn from(value: u64) -> Self {
380        EqualValue::UInt(value)
381    }
382}
383
384#[derive(Clone, Debug, Serialize, Deserialize)]
385pub enum ContentFilter {
386    // Search-based filters
387    /// Only match the exact content.
388    ///
389    /// For example, if the content is "pale blue dot", the filter "pale" or "pale blue" will match.
390    Exact(String),
391    /// Match the content with a prefix.
392    ///
393    /// For example, if the content is "error message", the filter "err" or "error mess" will match.
394    Prefix(String),
395    /// Match the content with a postfix. Similar to `Prefix`.
396    Postfix(String),
397    /// Match the content with a substring.
398    Contains(String),
399    /// Match the content with a regex pattern. The pattern should be a valid Rust regex.
400    Regex(String),
401
402    // Value-based filters
403    /// Content exists, a.k.a. not null.
404    Exist,
405    Between {
406        start: String,
407        end: String,
408        start_inclusive: bool,
409        end_inclusive: bool,
410    },
411    GreatThan {
412        value: String,
413        inclusive: bool,
414    },
415    LessThan {
416        value: String,
417        inclusive: bool,
418    },
419    In(Vec<String>),
420    IsTrue,
421    IsFalse,
422    Equal(EqualValue),
423
424    // Compound filters
425    Compound(Vec<ContentFilter>, ConjunctionOperator),
426}
427
428#[derive(Clone, Debug, Serialize, Deserialize)]
429pub enum ConjunctionOperator {
430    And,
431    Or,
432}
433
434/// Binary operators for LogExpr::BinaryOp.
435#[derive(Clone, Debug, Serialize, Deserialize)]
436pub enum BinaryOperator {
437    // Comparison operators
438    Eq,
439    Ne,
440    Lt,
441    Le,
442    Gt,
443    Ge,
444
445    // Arithmetic operators
446    Plus,
447    Minus,
448    Multiply,
449    Divide,
450    Modulo,
451
452    // Logical operators
453    And,
454    Or,
455}
456
457/// Controls how many adjacent lines to return.
458#[derive(Debug, Default, Serialize, Deserialize)]
459pub enum Context {
460    #[default]
461    None,
462    /// Specify the number of lines before and after the matched line separately.
463    Lines(usize, usize),
464    /// Specify the number of seconds before and after the matched line occurred.
465    Seconds(usize, usize),
466}
467
468/// Represents limit and offset parameters for query pagination.
469#[derive(Debug, Default, Serialize, Deserialize)]
470pub struct Limit {
471    /// Optional number of items to skip before starting to return results
472    pub skip: Option<usize>,
473    /// Optional number of items to return after skipping
474    pub fetch: Option<usize>,
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use crate::error::Error;
481
482    #[test]
483    fn test_canonicalize() {
484        // with 'start' only
485        let mut tf = TimeFilter {
486            start: Some("2023-10-01".to_string()),
487            end: None,
488            span: None,
489        };
490        tf.canonicalize().unwrap();
491        assert!(tf.end.is_some());
492
493        // with 'start' and 'span'
494        let mut tf = TimeFilter {
495            start: Some("2023-10-01T00:00:00Z".to_string()),
496            end: None,
497            span: Some("86400".to_string()), // 1 day in seconds
498        };
499        tf.canonicalize().unwrap();
500        assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
501
502        // with 'end' and 'span'
503        let mut tf = TimeFilter {
504            start: None,
505            end: Some("2023-10-02T00:00:00Z".to_string()),
506            span: Some("86400".to_string()), // 1 day in seconds
507        };
508        tf.canonicalize().unwrap();
509        assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
510
511        // with both 'start' and 'end'
512        let mut tf = TimeFilter {
513            start: Some("2023-10-01T00:00:00Z".to_string()),
514            end: Some("2023-10-02T00:00:00Z".to_string()),
515            span: None,
516        };
517        tf.canonicalize().unwrap();
518        assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
519        assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
520
521        // with invalid date format
522        let mut tf = TimeFilter {
523            start: Some("invalid-date".to_string()),
524            end: None,
525            span: None,
526        };
527        let result = tf.canonicalize();
528        assert!(matches!(result, Err(Error::InvalidDateFormat { .. })));
529
530        // with missing 'start' and 'end'
531        let mut tf = TimeFilter {
532            start: None,
533            end: None,
534            span: None,
535        };
536        let result = tf.canonicalize();
537        assert!(matches!(result, Err(Error::InvalidTimeFilter { .. })));
538
539        // 'end' is before 'start'
540        let mut tf = TimeFilter {
541            start: Some("2023-10-02T00:00:00Z".to_string()),
542            end: Some("2023-10-01T00:00:00Z".to_string()),
543            span: None,
544        };
545        let result = tf.canonicalize();
546        assert!(matches!(result, Err(Error::EndBeforeStart { .. })));
547    }
548}