1use std::fmt::{Display, Formatter};
16
17use common_recordbatch::OrderOption;
18use datafusion_expr::expr::Expr;
19use strum::Display;
20
21use crate::storage::SequenceNumber;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
25pub enum TimeSeriesRowSelector {
26 LastRow,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
32pub enum TimeSeriesDistribution {
33 TimeWindowed,
36 PerSeries,
39}
40
41#[derive(Default, Clone, Debug, PartialEq, Eq)]
42pub struct ScanRequest {
43 pub projection: Option<Vec<usize>>,
46 pub filters: Vec<Expr>,
48 pub output_ordering: Option<Vec<OrderOption>>,
50 pub limit: Option<usize>,
55 pub series_row_selector: Option<TimeSeriesRowSelector>,
57 pub memtable_max_sequence: Option<SequenceNumber>,
61 pub memtable_min_sequence: Option<SequenceNumber>,
64 pub sst_min_sequence: Option<SequenceNumber>,
67 pub distribution: Option<TimeSeriesDistribution>,
69}
70
71impl Display for ScanRequest {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 enum Delimiter {
74 None,
75 Init,
76 }
77
78 impl Delimiter {
79 fn as_str(&mut self) -> &str {
80 match self {
81 Delimiter::None => {
82 *self = Delimiter::Init;
83 ""
84 }
85 Delimiter::Init => ", ",
86 }
87 }
88 }
89
90 let mut delimiter = Delimiter::None;
91
92 write!(f, "ScanRequest {{ ")?;
93 if let Some(projection) = &self.projection {
94 write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
95 }
96 if !self.filters.is_empty() {
97 write!(
98 f,
99 "{}filters: [{}]",
100 delimiter.as_str(),
101 self.filters
102 .iter()
103 .map(|f| f.to_string())
104 .collect::<Vec<_>>()
105 .join(", ")
106 )?;
107 }
108 if let Some(output_ordering) = &self.output_ordering {
109 write!(
110 f,
111 "{}output_ordering: {:?}",
112 delimiter.as_str(),
113 output_ordering
114 )?;
115 }
116 if let Some(limit) = &self.limit {
117 write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
118 }
119 if let Some(series_row_selector) = &self.series_row_selector {
120 write!(
121 f,
122 "{}series_row_selector: {}",
123 delimiter.as_str(),
124 series_row_selector
125 )?;
126 }
127 if let Some(sequence) = &self.memtable_max_sequence {
128 write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
129 }
130 if let Some(sst_min_sequence) = &self.sst_min_sequence {
131 write!(
132 f,
133 "{}sst_min_sequence: {}",
134 delimiter.as_str(),
135 sst_min_sequence
136 )?;
137 }
138 if let Some(distribution) = &self.distribution {
139 write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
140 }
141 write!(f, " }}")
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use datafusion_expr::{Operator, binary_expr, col, lit};
148
149 use super::*;
150
151 #[test]
152 fn test_display_scan_request() {
153 let request = ScanRequest {
154 ..Default::default()
155 };
156 assert_eq!(request.to_string(), "ScanRequest { }");
157
158 let request = ScanRequest {
159 projection: Some(vec![1, 2]),
160 filters: vec![
161 binary_expr(col("i"), Operator::Gt, lit(1)),
162 binary_expr(col("s"), Operator::Eq, lit("x")),
163 ],
164 limit: Some(10),
165 ..Default::default()
166 };
167 assert_eq!(
168 request.to_string(),
169 r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
170 );
171
172 let request = ScanRequest {
173 filters: vec![
174 binary_expr(col("i"), Operator::Gt, lit(1)),
175 binary_expr(col("s"), Operator::Eq, lit("x")),
176 ],
177 limit: Some(10),
178 ..Default::default()
179 };
180 assert_eq!(
181 request.to_string(),
182 r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
183 );
184
185 let request = ScanRequest {
186 projection: Some(vec![1, 2]),
187 limit: Some(10),
188 ..Default::default()
189 };
190 assert_eq!(
191 request.to_string(),
192 "ScanRequest { projection: [1, 2], limit: 10 }"
193 );
194 }
195}