1use std::fmt::{Display, Formatter};
16
17use common_error::ext::BoxedError;
18use common_recordbatch::OrderOption;
19use datafusion_expr::expr::Expr;
20pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
22use strum::Display;
23
24use crate::storage::{ColumnId, SequenceNumber};
25
26#[derive(Debug, Clone, PartialEq)]
28pub struct VectorSearchRequest {
29 pub column_id: ColumnId,
31 pub query_vector: Vec<f32>,
33 pub k: usize,
35 pub metric: VectorDistanceMetric,
37}
38
39#[derive(Debug, Clone, PartialEq)]
41pub struct VectorSearchMatches {
42 pub keys: Vec<u64>,
44 pub distances: Vec<f32>,
46}
47
48pub trait VectorIndexEngine: Send + Sync {
53 fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError>;
55
56 fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError>;
58
59 fn serialized_length(&self) -> usize;
61
62 fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError>;
64
65 fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError>;
67
68 fn size(&self) -> usize;
70
71 fn capacity(&self) -> usize;
73
74 fn memory_usage(&self) -> usize;
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
80pub enum TimeSeriesRowSelector {
81 LastRow,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
87pub enum TimeSeriesDistribution {
88 TimeWindowed,
91 PerSeries,
94}
95
96#[derive(Default, Clone, Debug, PartialEq)]
97pub struct ScanRequest {
98 pub projection: Option<Vec<usize>>,
101 pub filters: Vec<Expr>,
103 pub output_ordering: Option<Vec<OrderOption>>,
105 pub limit: Option<usize>,
110 pub series_row_selector: Option<TimeSeriesRowSelector>,
112 pub memtable_max_sequence: Option<SequenceNumber>,
118 pub memtable_min_sequence: Option<SequenceNumber>,
121 pub sst_min_sequence: Option<SequenceNumber>,
124 pub snapshot_on_scan: bool,
126 pub distribution: Option<TimeSeriesDistribution>,
128 pub vector_search: Option<VectorSearchRequest>,
131}
132
133impl Display for ScanRequest {
134 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
135 enum Delimiter {
136 None,
137 Init,
138 }
139
140 impl Delimiter {
141 fn as_str(&mut self) -> &str {
142 match self {
143 Delimiter::None => {
144 *self = Delimiter::Init;
145 ""
146 }
147 Delimiter::Init => ", ",
148 }
149 }
150 }
151
152 let mut delimiter = Delimiter::None;
153
154 write!(f, "ScanRequest {{ ")?;
155 if let Some(projection) = &self.projection {
156 write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
157 }
158 if !self.filters.is_empty() {
159 write!(
160 f,
161 "{}filters: [{}]",
162 delimiter.as_str(),
163 self.filters
164 .iter()
165 .map(|f| f.to_string())
166 .collect::<Vec<_>>()
167 .join(", ")
168 )?;
169 }
170 if let Some(output_ordering) = &self.output_ordering {
171 write!(
172 f,
173 "{}output_ordering: {:?}",
174 delimiter.as_str(),
175 output_ordering
176 )?;
177 }
178 if let Some(limit) = &self.limit {
179 write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
180 }
181 if let Some(series_row_selector) = &self.series_row_selector {
182 write!(
183 f,
184 "{}series_row_selector: {}",
185 delimiter.as_str(),
186 series_row_selector
187 )?;
188 }
189 if let Some(sequence) = &self.memtable_max_sequence {
190 write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
191 }
192 if let Some(sst_min_sequence) = &self.sst_min_sequence {
193 write!(
194 f,
195 "{}sst_min_sequence: {}",
196 delimiter.as_str(),
197 sst_min_sequence
198 )?;
199 }
200 if self.snapshot_on_scan {
201 write!(
202 f,
203 "{}snapshot_on_scan: {}",
204 delimiter.as_str(),
205 self.snapshot_on_scan
206 )?;
207 }
208 if let Some(distribution) = &self.distribution {
209 write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
210 }
211 if let Some(vector_search) = &self.vector_search {
212 write!(
213 f,
214 "{}vector_search: column_id={}, k={}, metric={}",
215 delimiter.as_str(),
216 vector_search.column_id,
217 vector_search.k,
218 vector_search.metric
219 )?;
220 }
221 write!(f, " }}")
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use datafusion_expr::{Operator, binary_expr, col, lit};
228
229 use super::*;
230
231 #[test]
232 fn test_display_scan_request() {
233 let request = ScanRequest {
234 ..Default::default()
235 };
236 assert_eq!(request.to_string(), "ScanRequest { }");
237
238 let request = ScanRequest {
239 projection: Some(vec![1, 2]),
240 filters: vec![
241 binary_expr(col("i"), Operator::Gt, lit(1)),
242 binary_expr(col("s"), Operator::Eq, lit("x")),
243 ],
244 limit: Some(10),
245 ..Default::default()
246 };
247 assert_eq!(
248 request.to_string(),
249 r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
250 );
251
252 let request = ScanRequest {
253 filters: vec![
254 binary_expr(col("i"), Operator::Gt, lit(1)),
255 binary_expr(col("s"), Operator::Eq, lit("x")),
256 ],
257 limit: Some(10),
258 ..Default::default()
259 };
260 assert_eq!(
261 request.to_string(),
262 r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
263 );
264
265 let request = ScanRequest {
266 projection: Some(vec![1, 2]),
267 limit: Some(10),
268 ..Default::default()
269 };
270 assert_eq!(
271 request.to_string(),
272 "ScanRequest { projection: [1, 2], limit: 10 }"
273 );
274
275 let request = ScanRequest {
276 snapshot_on_scan: true,
277 ..Default::default()
278 };
279 assert_eq!(
280 request.to_string(),
281 "ScanRequest { snapshot_on_scan: true }"
282 );
283 }
284}