log_query/log_query.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{DateTime, Datelike, Duration, NaiveDate, NaiveTime, TimeZone, Utc};
16use serde::{Deserialize, Serialize};
17use table::table_name::TableName;
18
19use crate::error::{
20 EndBeforeStartSnafu, InvalidDateFormatSnafu, InvalidSpanFormatSnafu, InvalidTimeFilterSnafu,
21 Result,
22};
23
24/// GreptimeDB's log query request.
25#[derive(Debug, Serialize, Deserialize)]
26pub struct LogQuery {
27 // Global query parameters
28 /// A fully qualified table name to query logs from.
29 pub table: TableName,
30 /// Specifies the time range for the log query. See [`TimeFilter`] for more details.
31 pub time_filter: TimeFilter,
32 /// Controls row skipping and fetch on the result set.
33 pub limit: Limit,
34 /// Columns to return in the result set.
35 ///
36 /// The columns can be either from the original log or derived from processing exprs.
37 /// Default (empty) means all columns.
38 ///
39 /// TODO(ruihang): Do we need negative select?
40 pub columns: Vec<String>,
41
42 // Filters
43 /// Conjunction of filters to apply for the raw logs.
44 ///
45 /// Filters here can only refer to the columns from the original log.
46 pub filters: Vec<ColumnFilters>,
47 /// Adjacent lines to return. Applies to all filters above.
48 ///
49 /// TODO(ruihang): Do we need per-filter context?
50 pub context: Context,
51
52 // Processors
53 /// Expressions to calculate after filter.
54 pub exprs: Vec<LogExpr>,
55}
56
57/// Expression to calculate on log after filtering.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum LogExpr {
60 NamedIdent(String),
61 PositionalIdent(usize),
62 Literal(String),
63 ScalarFunc {
64 name: String,
65 args: Vec<LogExpr>,
66 alias: Option<String>,
67 },
68 AggrFunc {
69 name: String,
70 args: Vec<LogExpr>,
71 /// Optional range function parameter. Stands for the time range for both step and align.
72 range: Option<String>,
73 by: Vec<LogExpr>,
74 alias: Option<String>,
75 },
76 Decompose {
77 expr: Box<LogExpr>,
78 /// JSON, CSV, etc.
79 schema: String,
80 /// Fields with type name to extract from the decomposed value.
81 fields: Vec<(String, String)>,
82 },
83 BinaryOp {
84 left: Box<LogExpr>,
85 op: String,
86 right: Box<LogExpr>,
87 },
88 Alias {
89 expr: Box<LogExpr>,
90 alias: String,
91 },
92 Filter {
93 expr: Box<LogExpr>,
94 filter: ContentFilter,
95 },
96}
97
98impl Default for LogQuery {
99 fn default() -> Self {
100 Self {
101 table: TableName::new("", "", ""),
102 time_filter: Default::default(),
103 filters: vec![],
104 limit: Limit::default(),
105 context: Default::default(),
106 columns: vec![],
107 exprs: vec![],
108 }
109 }
110}
111
112/// Represents a time range for log query.
113///
114/// This struct allows various formats to express a time range from the user side
115/// for best flexibility:
116/// - Only `start` is provided: the `start` string can be any valid "date" or vaguer
117/// content. For example: "2024-12-01", "2024-12", "2024", etc. It will be treated
118/// as an time range corresponding to the provided date. E.g., "2024-12-01" refers
119/// to the entire 24 hours in that day. In this case, the `start` field cannot be a
120/// timestamp (like "2024-12-01T12:00:00Z").
121/// - Both `start` and `end` are provided: the `start` and `end` strings can be either
122/// a date or a timestamp. The `end` field is exclusive (`[start, end)`). When
123/// `start` is a date it implies the start of the day, and when `end` is a date it
124/// implies the end of the day.
125/// - `span` with `start` OR `end`: the `span` string can be any valid "interval"
126/// For example: "1024s", "1 week", "1 month", etc. The `span` field is applied to
127/// the `start` or `end` field to calculate the other one correspondingly. If `start`
128/// is provided, `end` is calculated as `start + span` and vice versa.
129/// - Only `span` is provided: the `span` string can be any valid "interval" as mentioned
130/// above. In this case, the current time (on the server side) is considered as the `end`.
131/// - All fields are provided: in this case, the `start` and `end` fields are considered
132/// with higher priority, and the `span` field is ignored.
133///
134/// This struct doesn't require a timezone to be presented. When the timezone is not
135/// provided, it will fill the default timezone with the same rules akin to other queries.
136#[derive(Debug, Clone, Default, Serialize, Deserialize)]
137pub struct TimeFilter {
138 pub start: Option<String>,
139 pub end: Option<String>,
140 pub span: Option<String>,
141}
142
143impl TimeFilter {
144 /// Validate and canonicalize the time filter.
145 ///
146 /// This function will try to fill the missing fields and convert all dates to timestamps
147 #[allow(unused_assignments)] // false positive
148 pub fn canonicalize(&mut self) -> Result<()> {
149 let mut start_dt = None;
150 let mut end_dt = None;
151
152 if self.start.is_some() && self.end.is_none() && self.span.is_none() {
153 // Only 'start' is provided
154 let s = self.start.as_ref().unwrap();
155 let (start, end_opt) = Self::parse_datetime(s)?;
156 if end_opt.is_none() {
157 return Err(InvalidTimeFilterSnafu {
158 filter: self.clone(),
159 }
160 .build());
161 }
162 start_dt = Some(start);
163 end_dt = end_opt;
164 } else if self.start.is_some() && self.end.is_some() {
165 // Both 'start' and 'end' are provided
166 let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
167 let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
168 start_dt = Some(start);
169 end_dt = Some(end);
170 } else if self.span.is_some() && (self.start.is_some() || self.end.is_some()) {
171 // 'span' with 'start' or 'end'
172 let span = Self::parse_span(self.span.as_ref().unwrap())?;
173 if self.start.is_some() {
174 let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
175 let end = start + span;
176 start_dt = Some(start);
177 end_dt = Some(end);
178 } else {
179 let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
180 let start = end - span;
181 start_dt = Some(start);
182 end_dt = Some(end);
183 }
184 } else if self.span.is_some() && self.start.is_none() && self.end.is_none() {
185 // Only 'span' is provided
186 let span = Self::parse_span(self.span.as_ref().unwrap())?;
187 let end = Utc::now();
188 let start = end - span;
189 start_dt = Some(start);
190 end_dt = Some(end);
191 } else if self.start.is_some() && self.span.is_some() && self.end.is_some() {
192 // All fields are provided; 'start' and 'end' take priority
193 let (start, _) = Self::parse_datetime(self.start.as_ref().unwrap())?;
194 let (end, _) = Self::parse_datetime(self.end.as_ref().unwrap())?;
195 start_dt = Some(start);
196 end_dt = Some(end);
197 } else {
198 // Exception
199 return Err(InvalidTimeFilterSnafu {
200 filter: self.clone(),
201 }
202 .build());
203 }
204
205 // Validate that end is after start
206 if let (Some(start), Some(end)) = (&start_dt, &end_dt) {
207 if end <= start {
208 return Err(EndBeforeStartSnafu {
209 start: start.to_rfc3339(),
210 end: end.to_rfc3339(),
211 }
212 .build());
213 }
214 }
215
216 // Update the fields with canonicalized timestamps
217 if let Some(start) = start_dt {
218 self.start = Some(start.to_rfc3339());
219 }
220
221 if let Some(end) = end_dt {
222 self.end = Some(end.to_rfc3339());
223 }
224
225 Ok(())
226 }
227
228 /// Util function returns a start and optional end DateTime
229 fn parse_datetime(s: &str) -> Result<(DateTime<Utc>, Option<DateTime<Utc>>)> {
230 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
231 Ok((dt.with_timezone(&Utc), None))
232 } else {
233 let formats = ["%Y-%m-%d", "%Y-%m", "%Y"];
234 for format in &formats {
235 if let Ok(naive_date) = NaiveDate::parse_from_str(s, format) {
236 let start = Utc.from_utc_datetime(
237 &naive_date.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap()),
238 );
239 let end = match *format {
240 "%Y-%m-%d" => start + Duration::days(1),
241 "%Y-%m" => {
242 let next_month = if naive_date.month() == 12 {
243 NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap()
244 } else {
245 NaiveDate::from_ymd_opt(
246 naive_date.year(),
247 naive_date.month() + 1,
248 1,
249 )
250 .unwrap()
251 };
252 Utc.from_utc_datetime(&next_month.and_hms_opt(0, 0, 0).unwrap())
253 }
254 "%Y" => {
255 let next_year =
256 NaiveDate::from_ymd_opt(naive_date.year() + 1, 1, 1).unwrap();
257 Utc.from_utc_datetime(&next_year.and_hms_opt(0, 0, 0).unwrap())
258 }
259 _ => unreachable!(),
260 };
261 return Ok((start, Some(end)));
262 }
263 }
264 Err(InvalidDateFormatSnafu {
265 input: s.to_string(),
266 }
267 .build())
268 }
269 }
270
271 /// Util function handles durations like "1 week", "1 month", etc (unimplemented).
272 fn parse_span(s: &str) -> Result<Duration> {
273 // Simplified parsing logic
274 if let Ok(seconds) = s.parse::<i64>() {
275 Ok(Duration::seconds(seconds))
276 } else {
277 Err(InvalidSpanFormatSnafu {
278 input: s.to_string(),
279 }
280 .build())
281 }
282 }
283}
284
285/// Represents a column with filters to query.
286#[derive(Debug, Serialize, Deserialize)]
287pub struct ColumnFilters {
288 /// Case-sensitive column name to query.
289 pub column_name: String,
290 /// Filters to apply to the column. Can be empty.
291 pub filters: Vec<ContentFilter>,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub enum ContentFilter {
296 // Search-based filters
297 /// Only match the exact content.
298 ///
299 /// For example, if the content is "pale blue dot", the filter "pale" or "pale blue" will match.
300 Exact(String),
301 /// Match the content with a prefix.
302 ///
303 /// For example, if the content is "error message", the filter "err" or "error mess" will match.
304 Prefix(String),
305 /// Match the content with a postfix. Similar to `Prefix`.
306 Postfix(String),
307 /// Match the content with a substring.
308 Contains(String),
309 /// Match the content with a regex pattern. The pattern should be a valid Rust regex.
310 Regex(String),
311
312 // Value-based filters
313 /// Content exists, a.k.a. not null.
314 Exist,
315 Between {
316 start: String,
317 end: String,
318 start_inclusive: bool,
319 end_inclusive: bool,
320 },
321 GreatThan {
322 value: String,
323 inclusive: bool,
324 },
325 LessThan {
326 value: String,
327 inclusive: bool,
328 },
329 In(Vec<String>),
330 // TODO(ruihang): arithmetic operations
331
332 // Compound filters
333 Compound(Vec<ContentFilter>, BinaryOperator),
334}
335
336#[derive(Clone, Debug, Serialize, Deserialize)]
337pub enum BinaryOperator {
338 And,
339 Or,
340}
341
342/// Controls how many adjacent lines to return.
343#[derive(Debug, Default, Serialize, Deserialize)]
344pub enum Context {
345 #[default]
346 None,
347 /// Specify the number of lines before and after the matched line separately.
348 Lines(usize, usize),
349 /// Specify the number of seconds before and after the matched line occurred.
350 Seconds(usize, usize),
351}
352
353/// Represents limit and offset parameters for query pagination.
354#[derive(Debug, Default, Serialize, Deserialize)]
355pub struct Limit {
356 /// Optional number of items to skip before starting to return results
357 pub skip: Option<usize>,
358 /// Optional number of items to return after skipping
359 pub fetch: Option<usize>,
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use crate::error::Error;
366
367 #[test]
368 fn test_canonicalize() {
369 // with 'start' only
370 let mut tf = TimeFilter {
371 start: Some("2023-10-01".to_string()),
372 end: None,
373 span: None,
374 };
375 tf.canonicalize().unwrap();
376 assert!(tf.end.is_some());
377
378 // with 'start' and 'span'
379 let mut tf = TimeFilter {
380 start: Some("2023-10-01T00:00:00Z".to_string()),
381 end: None,
382 span: Some("86400".to_string()), // 1 day in seconds
383 };
384 tf.canonicalize().unwrap();
385 assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
386
387 // with 'end' and 'span'
388 let mut tf = TimeFilter {
389 start: None,
390 end: Some("2023-10-02T00:00:00Z".to_string()),
391 span: Some("86400".to_string()), // 1 day in seconds
392 };
393 tf.canonicalize().unwrap();
394 assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
395
396 // with both 'start' and 'end'
397 let mut tf = TimeFilter {
398 start: Some("2023-10-01T00:00:00Z".to_string()),
399 end: Some("2023-10-02T00:00:00Z".to_string()),
400 span: None,
401 };
402 tf.canonicalize().unwrap();
403 assert_eq!(tf.start.as_ref().unwrap(), "2023-10-01T00:00:00+00:00");
404 assert_eq!(tf.end.as_ref().unwrap(), "2023-10-02T00:00:00+00:00");
405
406 // with invalid date format
407 let mut tf = TimeFilter {
408 start: Some("invalid-date".to_string()),
409 end: None,
410 span: None,
411 };
412 let result = tf.canonicalize();
413 assert!(matches!(result, Err(Error::InvalidDateFormat { .. })));
414
415 // with missing 'start' and 'end'
416 let mut tf = TimeFilter {
417 start: None,
418 end: None,
419 span: None,
420 };
421 let result = tf.canonicalize();
422 assert!(matches!(result, Err(Error::InvalidTimeFilter { .. })));
423
424 // 'end' is before 'start'
425 let mut tf = TimeFilter {
426 start: Some("2023-10-02T00:00:00Z".to_string()),
427 end: Some("2023-10-01T00:00:00Z".to_string()),
428 span: None,
429 };
430 let result = tf.canonicalize();
431 assert!(matches!(result, Err(Error::EndBeforeStart { .. })));
432 }
433}