store_api/storage/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16
17use common_recordbatch::OrderOption;
18use datafusion_expr::expr::Expr;
19use strum::Display;
20
21use crate::storage::SequenceNumber;
22
23/// A hint on how to select rows from a time-series.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
25pub enum TimeSeriesRowSelector {
26    /// Only keep the last row of each time-series.
27    LastRow,
28}
29
30/// A hint on how to distribute time-series data on the scan output.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
32pub enum TimeSeriesDistribution {
33    /// Data are distributed by time window first. The scanner will
34    /// return all data within one time window before moving to the next one.
35    TimeWindowed,
36    /// Data are organized by time-series first. The scanner will return
37    /// all data for one time-series before moving to the next one.
38    PerSeries,
39}
40
41#[derive(Default, Clone, Debug, PartialEq, Eq)]
42pub struct ScanRequest {
43    /// Indices of columns to read, `None` to read all columns. This indices is
44    /// based on table schema.
45    pub projection: Option<Vec<usize>>,
46    /// Filters pushed down
47    pub filters: Vec<Expr>,
48    /// Expected output ordering. This is only a hint and isn't guaranteed.
49    pub output_ordering: Option<Vec<OrderOption>>,
50    /// limit can be used to reduce the amount scanned
51    /// from the datasource as a performance optimization.
52    /// If set, it contains the amount of rows needed by the caller,
53    /// The data source should return *at least* this number of rows if available.
54    pub limit: Option<usize>,
55    /// Optional hint to select rows from time-series.
56    pub series_row_selector: Option<TimeSeriesRowSelector>,
57    /// Optional constraint on the sequence number of the rows to read.
58    /// If set, only rows with a sequence number **lesser or equal** to this value
59    /// will be returned.
60    pub memtable_max_sequence: Option<SequenceNumber>,
61    /// Optional constraint on the minimal sequence number in the memtable.
62    /// If set, only the memtables that contain sequences **greater than** this value will be scanned
63    pub memtable_min_sequence: Option<SequenceNumber>,
64    /// Optional constraint on the minimal sequence number in the SST files.
65    /// If set, only the SST files that contain sequences greater than this value will be scanned.
66    pub sst_min_sequence: Option<SequenceNumber>,
67    /// Optional hint for the distribution of time-series data.
68    pub distribution: Option<TimeSeriesDistribution>,
69}
70
71impl Display for ScanRequest {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        enum Delimiter {
74            None,
75            Init,
76        }
77
78        impl Delimiter {
79            fn as_str(&mut self) -> &str {
80                match self {
81                    Delimiter::None => {
82                        *self = Delimiter::Init;
83                        ""
84                    }
85                    Delimiter::Init => ", ",
86                }
87            }
88        }
89
90        let mut delimiter = Delimiter::None;
91
92        write!(f, "ScanRequest {{ ")?;
93        if let Some(projection) = &self.projection {
94            write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
95        }
96        if !self.filters.is_empty() {
97            write!(
98                f,
99                "{}filters: [{}]",
100                delimiter.as_str(),
101                self.filters
102                    .iter()
103                    .map(|f| f.to_string())
104                    .collect::<Vec<_>>()
105                    .join(", ")
106            )?;
107        }
108        if let Some(output_ordering) = &self.output_ordering {
109            write!(
110                f,
111                "{}output_ordering: {:?}",
112                delimiter.as_str(),
113                output_ordering
114            )?;
115        }
116        if let Some(limit) = &self.limit {
117            write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
118        }
119        if let Some(series_row_selector) = &self.series_row_selector {
120            write!(
121                f,
122                "{}series_row_selector: {}",
123                delimiter.as_str(),
124                series_row_selector
125            )?;
126        }
127        if let Some(sequence) = &self.memtable_max_sequence {
128            write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
129        }
130        if let Some(sst_min_sequence) = &self.sst_min_sequence {
131            write!(
132                f,
133                "{}sst_min_sequence: {}",
134                delimiter.as_str(),
135                sst_min_sequence
136            )?;
137        }
138        if let Some(distribution) = &self.distribution {
139            write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
140        }
141        write!(f, " }}")
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use datafusion_expr::{Operator, binary_expr, col, lit};
148
149    use super::*;
150
151    #[test]
152    fn test_display_scan_request() {
153        let request = ScanRequest {
154            ..Default::default()
155        };
156        assert_eq!(request.to_string(), "ScanRequest {  }");
157
158        let request = ScanRequest {
159            projection: Some(vec![1, 2]),
160            filters: vec![
161                binary_expr(col("i"), Operator::Gt, lit(1)),
162                binary_expr(col("s"), Operator::Eq, lit("x")),
163            ],
164            limit: Some(10),
165            ..Default::default()
166        };
167        assert_eq!(
168            request.to_string(),
169            r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
170        );
171
172        let request = ScanRequest {
173            filters: vec![
174                binary_expr(col("i"), Operator::Gt, lit(1)),
175                binary_expr(col("s"), Operator::Eq, lit("x")),
176            ],
177            limit: Some(10),
178            ..Default::default()
179        };
180        assert_eq!(
181            request.to_string(),
182            r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
183        );
184
185        let request = ScanRequest {
186            projection: Some(vec![1, 2]),
187            limit: Some(10),
188            ..Default::default()
189        };
190        assert_eq!(
191            request.to_string(),
192            "ScanRequest { projection: [1, 2], limit: 10 }"
193        );
194    }
195}