1use std::fmt::{Display, Formatter};
16
17use common_recordbatch::OrderOption;
18use datafusion_expr::expr::Expr;
19use strum::Display;
20
21use crate::storage::SequenceNumber;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
25pub enum TimeSeriesRowSelector {
26 LastRow,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
32pub enum TimeSeriesDistribution {
33 TimeWindowed,
36 PerSeries,
39}
40
41#[derive(Default, Clone, Debug, PartialEq, Eq)]
42pub struct ScanRequest {
43 pub projection: Option<Vec<usize>>,
46 pub filters: Vec<Expr>,
48 pub output_ordering: Option<Vec<OrderOption>>,
50 pub limit: Option<usize>,
55 pub series_row_selector: Option<TimeSeriesRowSelector>,
57 pub sequence: Option<SequenceNumber>,
61 pub sst_min_sequence: Option<SequenceNumber>,
64 pub distribution: Option<TimeSeriesDistribution>,
66}
67
68impl Display for ScanRequest {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 enum Delimiter {
71 None,
72 Init,
73 }
74
75 impl Delimiter {
76 fn as_str(&mut self) -> &str {
77 match self {
78 Delimiter::None => {
79 *self = Delimiter::Init;
80 ""
81 }
82 Delimiter::Init => ", ",
83 }
84 }
85 }
86
87 let mut delimiter = Delimiter::None;
88
89 write!(f, "ScanRequest {{ ")?;
90 if let Some(projection) = &self.projection {
91 write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
92 }
93 if !self.filters.is_empty() {
94 write!(
95 f,
96 "{}filters: [{}]",
97 delimiter.as_str(),
98 self.filters
99 .iter()
100 .map(|f| f.to_string())
101 .collect::<Vec<_>>()
102 .join(", ")
103 )?;
104 }
105 if let Some(output_ordering) = &self.output_ordering {
106 write!(
107 f,
108 "{}output_ordering: {:?}",
109 delimiter.as_str(),
110 output_ordering
111 )?;
112 }
113 if let Some(limit) = &self.limit {
114 write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
115 }
116 if let Some(series_row_selector) = &self.series_row_selector {
117 write!(
118 f,
119 "{}series_row_selector: {}",
120 delimiter.as_str(),
121 series_row_selector
122 )?;
123 }
124 if let Some(sequence) = &self.sequence {
125 write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
126 }
127 if let Some(sst_min_sequence) = &self.sst_min_sequence {
128 write!(
129 f,
130 "{}sst_min_sequence: {}",
131 delimiter.as_str(),
132 sst_min_sequence
133 )?;
134 }
135 if let Some(distribution) = &self.distribution {
136 write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
137 }
138 write!(f, " }}")
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use datafusion_expr::{binary_expr, col, lit, Operator};
145
146 use super::*;
147
148 #[test]
149 fn test_display_scan_request() {
150 let request = ScanRequest {
151 ..Default::default()
152 };
153 assert_eq!(request.to_string(), "ScanRequest { }");
154
155 let request = ScanRequest {
156 projection: Some(vec![1, 2]),
157 filters: vec![
158 binary_expr(col("i"), Operator::Gt, lit(1)),
159 binary_expr(col("s"), Operator::Eq, lit("x")),
160 ],
161 limit: Some(10),
162 ..Default::default()
163 };
164 assert_eq!(
165 request.to_string(),
166 r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
167 );
168
169 let request = ScanRequest {
170 filters: vec![
171 binary_expr(col("i"), Operator::Gt, lit(1)),
172 binary_expr(col("s"), Operator::Eq, lit("x")),
173 ],
174 limit: Some(10),
175 ..Default::default()
176 };
177 assert_eq!(
178 request.to_string(),
179 r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
180 );
181
182 let request = ScanRequest {
183 projection: Some(vec![1, 2]),
184 limit: Some(10),
185 ..Default::default()
186 };
187 assert_eq!(
188 request.to_string(),
189 "ScanRequest { projection: [1, 2], limit: 10 }"
190 );
191 }
192}