1use std::fmt::{Display, Formatter};
16
17use common_error::ext::BoxedError;
18use common_recordbatch::OrderOption;
19use datafusion_expr::expr::Expr;
20pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
22use strum::Display;
23
24use crate::storage::{ColumnId, SequenceNumber};
25
26#[derive(Debug, Clone, PartialEq)]
28pub struct VectorSearchRequest {
29 pub column_id: ColumnId,
31 pub query_vector: Vec<f32>,
33 pub k: usize,
35 pub metric: VectorDistanceMetric,
37}
38
39#[derive(Debug, Clone, PartialEq)]
41pub struct VectorSearchMatches {
42 pub keys: Vec<u64>,
44 pub distances: Vec<f32>,
46}
47
48pub trait VectorIndexEngine: Send + Sync {
53 fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError>;
55
56 fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError>;
58
59 fn serialized_length(&self) -> usize;
61
62 fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError>;
64
65 fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError>;
67
68 fn size(&self) -> usize;
70
71 fn capacity(&self) -> usize;
73
74 fn memory_usage(&self) -> usize;
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
80pub enum TimeSeriesRowSelector {
81 LastRow,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
87pub enum TimeSeriesDistribution {
88 TimeWindowed,
91 PerSeries,
94}
95
96#[derive(Default, Clone, Debug, PartialEq)]
97pub struct ScanRequest {
98 pub projection: Option<Vec<usize>>,
101 pub filters: Vec<Expr>,
103 pub output_ordering: Option<Vec<OrderOption>>,
105 pub limit: Option<usize>,
110 pub series_row_selector: Option<TimeSeriesRowSelector>,
112 pub memtable_max_sequence: Option<SequenceNumber>,
116 pub memtable_min_sequence: Option<SequenceNumber>,
119 pub sst_min_sequence: Option<SequenceNumber>,
122 pub distribution: Option<TimeSeriesDistribution>,
124 pub vector_search: Option<VectorSearchRequest>,
127}
128
129impl Display for ScanRequest {
130 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131 enum Delimiter {
132 None,
133 Init,
134 }
135
136 impl Delimiter {
137 fn as_str(&mut self) -> &str {
138 match self {
139 Delimiter::None => {
140 *self = Delimiter::Init;
141 ""
142 }
143 Delimiter::Init => ", ",
144 }
145 }
146 }
147
148 let mut delimiter = Delimiter::None;
149
150 write!(f, "ScanRequest {{ ")?;
151 if let Some(projection) = &self.projection {
152 write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
153 }
154 if !self.filters.is_empty() {
155 write!(
156 f,
157 "{}filters: [{}]",
158 delimiter.as_str(),
159 self.filters
160 .iter()
161 .map(|f| f.to_string())
162 .collect::<Vec<_>>()
163 .join(", ")
164 )?;
165 }
166 if let Some(output_ordering) = &self.output_ordering {
167 write!(
168 f,
169 "{}output_ordering: {:?}",
170 delimiter.as_str(),
171 output_ordering
172 )?;
173 }
174 if let Some(limit) = &self.limit {
175 write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
176 }
177 if let Some(series_row_selector) = &self.series_row_selector {
178 write!(
179 f,
180 "{}series_row_selector: {}",
181 delimiter.as_str(),
182 series_row_selector
183 )?;
184 }
185 if let Some(sequence) = &self.memtable_max_sequence {
186 write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
187 }
188 if let Some(sst_min_sequence) = &self.sst_min_sequence {
189 write!(
190 f,
191 "{}sst_min_sequence: {}",
192 delimiter.as_str(),
193 sst_min_sequence
194 )?;
195 }
196 if let Some(distribution) = &self.distribution {
197 write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
198 }
199 if let Some(vector_search) = &self.vector_search {
200 write!(
201 f,
202 "{}vector_search: column_id={}, k={}, metric={}",
203 delimiter.as_str(),
204 vector_search.column_id,
205 vector_search.k,
206 vector_search.metric
207 )?;
208 }
209 write!(f, " }}")
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use datafusion_expr::{Operator, binary_expr, col, lit};
216
217 use super::*;
218
219 #[test]
220 fn test_display_scan_request() {
221 let request = ScanRequest {
222 ..Default::default()
223 };
224 assert_eq!(request.to_string(), "ScanRequest { }");
225
226 let request = ScanRequest {
227 projection: Some(vec![1, 2]),
228 filters: vec![
229 binary_expr(col("i"), Operator::Gt, lit(1)),
230 binary_expr(col("s"), Operator::Eq, lit("x")),
231 ],
232 limit: Some(10),
233 ..Default::default()
234 };
235 assert_eq!(
236 request.to_string(),
237 r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
238 );
239
240 let request = ScanRequest {
241 filters: vec![
242 binary_expr(col("i"), Operator::Gt, lit(1)),
243 binary_expr(col("s"), Operator::Eq, lit("x")),
244 ],
245 limit: Some(10),
246 ..Default::default()
247 };
248 assert_eq!(
249 request.to_string(),
250 r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
251 );
252
253 let request = ScanRequest {
254 projection: Some(vec![1, 2]),
255 limit: Some(10),
256 ..Default::default()
257 };
258 assert_eq!(
259 request.to_string(),
260 "ScanRequest { projection: [1, 2], limit: 10 }"
261 );
262 }
263}