1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use common_error::ext::BoxedError;
19use common_recordbatch::OrderOption;
20use datafusion_expr::expr::Expr;
21pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
23use datatypes::types::json_type::JsonNativeType;
24use itertools::Itertools;
25use strum::Display;
26
27use crate::storage::{ColumnId, ProjectionInput, SequenceNumber};
28
29#[derive(Debug, Clone, PartialEq)]
31pub struct VectorSearchRequest {
32 pub column_id: ColumnId,
34 pub query_vector: Vec<f32>,
36 pub k: usize,
38 pub metric: VectorDistanceMetric,
40}
41
42#[derive(Debug, Clone, PartialEq)]
44pub struct VectorSearchMatches {
45 pub keys: Vec<u64>,
47 pub distances: Vec<f32>,
49}
50
51pub trait VectorIndexEngine: Send + Sync {
56 fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError>;
58
59 fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError>;
61
62 fn serialized_length(&self) -> usize;
64
65 fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError>;
67
68 fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError>;
70
71 fn size(&self) -> usize;
73
74 fn capacity(&self) -> usize;
76
77 fn memory_usage(&self) -> usize;
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
83pub enum TimeSeriesRowSelector {
84 LastRow,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
90pub enum TimeSeriesDistribution {
91 TimeWindowed,
94 PerSeries,
97}
98
99#[derive(Default, Clone, Debug, PartialEq)]
100pub struct ScanRequest {
101 pub projection_input: Option<ProjectionInput>,
104 pub filters: Vec<Expr>,
106 pub output_ordering: Option<Vec<OrderOption>>,
108 pub limit: Option<usize>,
113 pub series_row_selector: Option<TimeSeriesRowSelector>,
115 pub memtable_max_sequence: Option<SequenceNumber>,
121 pub memtable_min_sequence: Option<SequenceNumber>,
124 pub sst_min_sequence: Option<SequenceNumber>,
127 pub skip_sst_files: bool,
130 pub snapshot_on_scan: bool,
132 pub distribution: Option<TimeSeriesDistribution>,
134 pub vector_search: Option<VectorSearchRequest>,
137 pub json_type_hint: HashMap<String, JsonNativeType>,
139}
140
141impl ScanRequest {
142 pub fn projection_indices(&self) -> Option<&[usize]> {
144 self.projection_input
145 .as_ref()
146 .map(|projection_input| projection_input.projection.as_slice())
147 }
148}
149
150impl Display for ScanRequest {
151 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152 enum Delimiter {
153 None,
154 Init,
155 }
156
157 impl Delimiter {
158 fn as_str(&mut self) -> &str {
159 match self {
160 Delimiter::None => {
161 *self = Delimiter::Init;
162 ""
163 }
164 Delimiter::Init => ", ",
165 }
166 }
167 }
168
169 let mut delimiter = Delimiter::None;
170
171 write!(f, "ScanRequest {{ ")?;
172 if let Some(projection) = &self.projection_input {
173 write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
174 }
175 if !self.filters.is_empty() {
176 write!(
177 f,
178 "{}filters: [{}]",
179 delimiter.as_str(),
180 self.filters
181 .iter()
182 .map(|f| f.to_string())
183 .collect::<Vec<_>>()
184 .join(", ")
185 )?;
186 }
187 if let Some(output_ordering) = &self.output_ordering {
188 write!(
189 f,
190 "{}output_ordering: {:?}",
191 delimiter.as_str(),
192 output_ordering
193 )?;
194 }
195 if let Some(limit) = &self.limit {
196 write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
197 }
198 if let Some(series_row_selector) = &self.series_row_selector {
199 write!(
200 f,
201 "{}series_row_selector: {}",
202 delimiter.as_str(),
203 series_row_selector
204 )?;
205 }
206 if let Some(sequence) = &self.memtable_max_sequence {
207 write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
208 }
209 if let Some(sst_min_sequence) = &self.sst_min_sequence {
210 write!(
211 f,
212 "{}sst_min_sequence: {}",
213 delimiter.as_str(),
214 sst_min_sequence
215 )?;
216 }
217 if self.skip_sst_files {
218 write!(
219 f,
220 "{}skip_sst_files: {}",
221 delimiter.as_str(),
222 self.skip_sst_files
223 )?;
224 }
225 if self.snapshot_on_scan {
226 write!(
227 f,
228 "{}snapshot_on_scan: {}",
229 delimiter.as_str(),
230 self.snapshot_on_scan
231 )?;
232 }
233 if let Some(distribution) = &self.distribution {
234 write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
235 }
236 if let Some(vector_search) = &self.vector_search {
237 write!(
238 f,
239 "{}vector_search: column_id={}, k={}, metric={}",
240 delimiter.as_str(),
241 vector_search.column_id,
242 vector_search.k,
243 vector_search.metric
244 )?;
245 }
246 if !self.json_type_hint.is_empty() {
247 write!(
248 f,
249 "{}json_type_hint: {}",
250 delimiter.as_str(),
251 self.json_type_hint
252 .iter()
253 .map(|(column, json_type)| format!("({column}: {json_type})"))
254 .join(", ")
255 )?;
256 }
257 write!(f, " }}")
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use datafusion_expr::{Operator, binary_expr, col, lit};
264
265 use super::*;
266
267 #[test]
268 fn test_display_scan_request() {
269 let request = ScanRequest {
270 ..Default::default()
271 };
272 assert_eq!(request.to_string(), "ScanRequest { }");
273
274 let projection_input = Some(vec![1, 2].into());
275 let request = ScanRequest {
276 projection_input,
277 filters: vec![
278 binary_expr(col("i"), Operator::Gt, lit(1)),
279 binary_expr(col("s"), Operator::Eq, lit("x")),
280 ],
281 limit: Some(10),
282 ..Default::default()
283 };
284 assert_eq!(
285 request.to_string(),
286 r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
287 );
288
289 let request = ScanRequest {
290 filters: vec![
291 binary_expr(col("i"), Operator::Gt, lit(1)),
292 binary_expr(col("s"), Operator::Eq, lit("x")),
293 ],
294 limit: Some(10),
295 ..Default::default()
296 };
297 assert_eq!(
298 request.to_string(),
299 r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
300 );
301
302 let projection_input = Some(vec![1, 2].into());
303 let request = ScanRequest {
304 projection_input,
305 limit: Some(10),
306 ..Default::default()
307 };
308 assert_eq!(
309 request.to_string(),
310 "ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, limit: 10 }"
311 );
312
313 let projection_input = Some(ProjectionInput::new(vec![1, 2]).with_nested_paths(vec![
314 vec!["j".to_string(), "a".to_string(), "b".to_string()],
315 vec!["s".to_string(), "x".to_string()],
316 ]));
317 let request = ScanRequest {
318 projection_input,
319 limit: Some(10),
320 ..Default::default()
321 };
322 assert_eq!(
323 request.to_string(),
324 r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [["j", "a", "b"], ["s", "x"]] }, limit: 10 }"#
325 );
326
327 let request = ScanRequest {
328 snapshot_on_scan: true,
329 ..Default::default()
330 };
331 assert_eq!(
332 request.to_string(),
333 "ScanRequest { snapshot_on_scan: true }"
334 );
335
336 let request = ScanRequest {
337 skip_sst_files: true,
338 ..Default::default()
339 };
340 assert_eq!(request.to_string(), "ScanRequest { skip_sst_files: true }");
341 }
342}