1use std::fmt::{Display, Formatter};
16
17use common_error::ext::BoxedError;
18use common_recordbatch::OrderOption;
19use datafusion_expr::expr::Expr;
20pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
22use strum::Display;
23
24use crate::storage::{ColumnId, ProjectionInput, SequenceNumber};
25
26#[derive(Debug, Clone, PartialEq)]
28pub struct VectorSearchRequest {
29 pub column_id: ColumnId,
31 pub query_vector: Vec<f32>,
33 pub k: usize,
35 pub metric: VectorDistanceMetric,
37}
38
39#[derive(Debug, Clone, PartialEq)]
41pub struct VectorSearchMatches {
42 pub keys: Vec<u64>,
44 pub distances: Vec<f32>,
46}
47
48pub trait VectorIndexEngine: Send + Sync {
53 fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError>;
55
56 fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError>;
58
59 fn serialized_length(&self) -> usize;
61
62 fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError>;
64
65 fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError>;
67
68 fn size(&self) -> usize;
70
71 fn capacity(&self) -> usize;
73
74 fn memory_usage(&self) -> usize;
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
80pub enum TimeSeriesRowSelector {
81 LastRow,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
87pub enum TimeSeriesDistribution {
88 TimeWindowed,
91 PerSeries,
94}
95
96#[derive(Default, Clone, Debug, PartialEq)]
97pub struct ScanRequest {
98 pub projection_input: Option<ProjectionInput>,
101 pub filters: Vec<Expr>,
103 pub output_ordering: Option<Vec<OrderOption>>,
105 pub limit: Option<usize>,
110 pub series_row_selector: Option<TimeSeriesRowSelector>,
112 pub memtable_max_sequence: Option<SequenceNumber>,
118 pub memtable_min_sequence: Option<SequenceNumber>,
121 pub sst_min_sequence: Option<SequenceNumber>,
124 pub snapshot_on_scan: bool,
126 pub distribution: Option<TimeSeriesDistribution>,
128 pub vector_search: Option<VectorSearchRequest>,
131}
132
133impl ScanRequest {
134 pub fn projection_indices(&self) -> Option<&[usize]> {
136 self.projection_input
137 .as_ref()
138 .map(|projection_input| projection_input.projection.as_slice())
139 }
140}
141
142impl Display for ScanRequest {
143 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144 enum Delimiter {
145 None,
146 Init,
147 }
148
149 impl Delimiter {
150 fn as_str(&mut self) -> &str {
151 match self {
152 Delimiter::None => {
153 *self = Delimiter::Init;
154 ""
155 }
156 Delimiter::Init => ", ",
157 }
158 }
159 }
160
161 let mut delimiter = Delimiter::None;
162
163 write!(f, "ScanRequest {{ ")?;
164 if let Some(projection) = &self.projection_input {
165 write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
166 }
167 if !self.filters.is_empty() {
168 write!(
169 f,
170 "{}filters: [{}]",
171 delimiter.as_str(),
172 self.filters
173 .iter()
174 .map(|f| f.to_string())
175 .collect::<Vec<_>>()
176 .join(", ")
177 )?;
178 }
179 if let Some(output_ordering) = &self.output_ordering {
180 write!(
181 f,
182 "{}output_ordering: {:?}",
183 delimiter.as_str(),
184 output_ordering
185 )?;
186 }
187 if let Some(limit) = &self.limit {
188 write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
189 }
190 if let Some(series_row_selector) = &self.series_row_selector {
191 write!(
192 f,
193 "{}series_row_selector: {}",
194 delimiter.as_str(),
195 series_row_selector
196 )?;
197 }
198 if let Some(sequence) = &self.memtable_max_sequence {
199 write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
200 }
201 if let Some(sst_min_sequence) = &self.sst_min_sequence {
202 write!(
203 f,
204 "{}sst_min_sequence: {}",
205 delimiter.as_str(),
206 sst_min_sequence
207 )?;
208 }
209 if self.snapshot_on_scan {
210 write!(
211 f,
212 "{}snapshot_on_scan: {}",
213 delimiter.as_str(),
214 self.snapshot_on_scan
215 )?;
216 }
217 if let Some(distribution) = &self.distribution {
218 write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
219 }
220 if let Some(vector_search) = &self.vector_search {
221 write!(
222 f,
223 "{}vector_search: column_id={}, k={}, metric={}",
224 delimiter.as_str(),
225 vector_search.column_id,
226 vector_search.k,
227 vector_search.metric
228 )?;
229 }
230 write!(f, " }}")
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use datafusion_expr::{Operator, binary_expr, col, lit};
237
238 use super::*;
239
240 #[test]
241 fn test_display_scan_request() {
242 let request = ScanRequest {
243 ..Default::default()
244 };
245 assert_eq!(request.to_string(), "ScanRequest { }");
246
247 let projection_input = Some(vec![1, 2].into());
248 let request = ScanRequest {
249 projection_input,
250 filters: vec![
251 binary_expr(col("i"), Operator::Gt, lit(1)),
252 binary_expr(col("s"), Operator::Eq, lit("x")),
253 ],
254 limit: Some(10),
255 ..Default::default()
256 };
257 assert_eq!(
258 request.to_string(),
259 r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
260 );
261
262 let request = ScanRequest {
263 filters: vec![
264 binary_expr(col("i"), Operator::Gt, lit(1)),
265 binary_expr(col("s"), Operator::Eq, lit("x")),
266 ],
267 limit: Some(10),
268 ..Default::default()
269 };
270 assert_eq!(
271 request.to_string(),
272 r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
273 );
274
275 let projection_input = Some(vec![1, 2].into());
276 let request = ScanRequest {
277 projection_input,
278 limit: Some(10),
279 ..Default::default()
280 };
281 assert_eq!(
282 request.to_string(),
283 "ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, limit: 10 }"
284 );
285
286 let projection_input = Some(ProjectionInput::new(vec![1, 2]).with_nested_paths(vec![
287 vec!["j".to_string(), "a".to_string(), "b".to_string()],
288 vec!["s".to_string(), "x".to_string()],
289 ]));
290 let request = ScanRequest {
291 projection_input,
292 limit: Some(10),
293 ..Default::default()
294 };
295 assert_eq!(
296 request.to_string(),
297 r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [["j", "a", "b"], ["s", "x"]] }, limit: 10 }"#
298 );
299
300 let request = ScanRequest {
301 snapshot_on_scan: true,
302 ..Default::default()
303 };
304 assert_eq!(
305 request.to_string(),
306 "ScanRequest { snapshot_on_scan: true }"
307 );
308 }
309}