1use std::fmt::{Display, Formatter};
16
17use common_error::ext::BoxedError;
18use common_recordbatch::OrderOption;
19use datafusion_expr::expr::Expr;
20pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
22use strum::Display;
23
24use crate::storage::{ColumnId, SequenceNumber};
25
26#[derive(Debug, Clone, PartialEq)]
28pub struct VectorSearchRequest {
29 pub column_id: ColumnId,
31 pub query_vector: Vec<f32>,
33 pub k: usize,
35 pub metric: VectorDistanceMetric,
37}
38
39#[derive(Debug, Clone, PartialEq)]
41pub struct VectorSearchMatches {
42 pub keys: Vec<u64>,
44 pub distances: Vec<f32>,
46}
47
48pub trait VectorIndexEngine: Send + Sync {
53 fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError>;
55
56 fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError>;
58
59 fn serialized_length(&self) -> usize;
61
62 fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError>;
64
65 fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError>;
67
68 fn size(&self) -> usize;
70
71 fn capacity(&self) -> usize;
73
74 fn memory_usage(&self) -> usize;
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
80pub enum TimeSeriesRowSelector {
81 LastRow,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
87pub enum TimeSeriesDistribution {
88 TimeWindowed,
91 PerSeries,
94}
95
96#[derive(Default, Clone, Debug, PartialEq)]
97pub struct ScanRequest {
98 pub projection: Option<Vec<usize>>,
101 pub filters: Vec<Expr>,
103 pub output_ordering: Option<Vec<OrderOption>>,
105 pub limit: Option<usize>,
110 pub series_row_selector: Option<TimeSeriesRowSelector>,
112 pub memtable_max_sequence: Option<SequenceNumber>,
116 pub memtable_min_sequence: Option<SequenceNumber>,
119 pub sst_min_sequence: Option<SequenceNumber>,
122 pub distribution: Option<TimeSeriesDistribution>,
124 pub vector_search: Option<VectorSearchRequest>,
127 pub force_flat_format: bool,
129}
130
131impl Display for ScanRequest {
132 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133 enum Delimiter {
134 None,
135 Init,
136 }
137
138 impl Delimiter {
139 fn as_str(&mut self) -> &str {
140 match self {
141 Delimiter::None => {
142 *self = Delimiter::Init;
143 ""
144 }
145 Delimiter::Init => ", ",
146 }
147 }
148 }
149
150 let mut delimiter = Delimiter::None;
151
152 write!(f, "ScanRequest {{ ")?;
153 if let Some(projection) = &self.projection {
154 write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
155 }
156 if !self.filters.is_empty() {
157 write!(
158 f,
159 "{}filters: [{}]",
160 delimiter.as_str(),
161 self.filters
162 .iter()
163 .map(|f| f.to_string())
164 .collect::<Vec<_>>()
165 .join(", ")
166 )?;
167 }
168 if let Some(output_ordering) = &self.output_ordering {
169 write!(
170 f,
171 "{}output_ordering: {:?}",
172 delimiter.as_str(),
173 output_ordering
174 )?;
175 }
176 if let Some(limit) = &self.limit {
177 write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
178 }
179 if let Some(series_row_selector) = &self.series_row_selector {
180 write!(
181 f,
182 "{}series_row_selector: {}",
183 delimiter.as_str(),
184 series_row_selector
185 )?;
186 }
187 if let Some(sequence) = &self.memtable_max_sequence {
188 write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
189 }
190 if let Some(sst_min_sequence) = &self.sst_min_sequence {
191 write!(
192 f,
193 "{}sst_min_sequence: {}",
194 delimiter.as_str(),
195 sst_min_sequence
196 )?;
197 }
198 if let Some(distribution) = &self.distribution {
199 write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
200 }
201 if let Some(vector_search) = &self.vector_search {
202 write!(
203 f,
204 "{}vector_search: column_id={}, k={}, metric={}",
205 delimiter.as_str(),
206 vector_search.column_id,
207 vector_search.k,
208 vector_search.metric
209 )?;
210 }
211 if self.force_flat_format {
212 write!(
213 f,
214 "{}force_flat_format: {}",
215 delimiter.as_str(),
216 self.force_flat_format
217 )?;
218 }
219 write!(f, " }}")
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use datafusion_expr::{Operator, binary_expr, col, lit};
226
227 use super::*;
228
229 #[test]
230 fn test_display_scan_request() {
231 let request = ScanRequest {
232 ..Default::default()
233 };
234 assert_eq!(request.to_string(), "ScanRequest { }");
235
236 let request = ScanRequest {
237 projection: Some(vec![1, 2]),
238 filters: vec![
239 binary_expr(col("i"), Operator::Gt, lit(1)),
240 binary_expr(col("s"), Operator::Eq, lit("x")),
241 ],
242 limit: Some(10),
243 ..Default::default()
244 };
245 assert_eq!(
246 request.to_string(),
247 r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
248 );
249
250 let request = ScanRequest {
251 filters: vec![
252 binary_expr(col("i"), Operator::Gt, lit(1)),
253 binary_expr(col("s"), Operator::Eq, lit("x")),
254 ],
255 limit: Some(10),
256 ..Default::default()
257 };
258 assert_eq!(
259 request.to_string(),
260 r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
261 );
262
263 let request = ScanRequest {
264 projection: Some(vec![1, 2]),
265 limit: Some(10),
266 ..Default::default()
267 };
268 assert_eq!(
269 request.to_string(),
270 "ScanRequest { projection: [1, 2], limit: 10 }"
271 );
272
273 let request = ScanRequest {
274 force_flat_format: true,
275 ..Default::default()
276 };
277 assert_eq!(
278 request.to_string(),
279 "ScanRequest { force_flat_format: true }"
280 );
281 }
282}