Skip to main content

store_api/storage/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Display, Formatter};
16
17use common_error::ext::BoxedError;
18use common_recordbatch::OrderOption;
19use datafusion_expr::expr::Expr;
20// Re-export vector types from datatypes to avoid duplication
21pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
22use strum::Display;
23
24use crate::storage::{ColumnId, ProjectionInput, SequenceNumber};
25
26/// A hint for KNN vector search.
27#[derive(Debug, Clone, PartialEq)]
28pub struct VectorSearchRequest {
29    /// Column ID of the vector column to search.
30    pub column_id: ColumnId,
31    /// The query vector to search for.
32    pub query_vector: Vec<f32>,
33    /// Number of nearest neighbors to return.
34    pub k: usize,
35    /// Distance metric to use (matches the index metric).
36    pub metric: VectorDistanceMetric,
37}
38
39/// Search results from vector index.
40#[derive(Debug, Clone, PartialEq)]
41pub struct VectorSearchMatches {
42    /// Keys (row offsets in the index).
43    pub keys: Vec<u64>,
44    /// Distances from the query vector.
45    pub distances: Vec<f32>,
46}
47
48/// Trait for vector index engines (HNSW implementations).
49///
50/// This trait defines the interface for pluggable vector index engines.
51/// Implementations (e.g., UsearchEngine) are provided by storage engines like mito2.
52pub trait VectorIndexEngine: Send + Sync {
53    /// Adds a vector with the given key.
54    fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError>;
55
56    /// Searches for k nearest neighbors.
57    fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError>;
58
59    /// Returns the serialized length.
60    fn serialized_length(&self) -> usize;
61
62    /// Serializes the index to a buffer.
63    fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError>;
64
65    /// Reserves capacity for vectors.
66    fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError>;
67
68    /// Returns current size (number of vectors).
69    fn size(&self) -> usize;
70
71    /// Returns current capacity.
72    fn capacity(&self) -> usize;
73
74    /// Returns memory usage in bytes.
75    fn memory_usage(&self) -> usize;
76}
77
78/// A hint on how to select rows from a time-series.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
80pub enum TimeSeriesRowSelector {
81    /// Only keep the last row of each time-series.
82    LastRow,
83}
84
85/// A hint on how to distribute time-series data on the scan output.
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
87pub enum TimeSeriesDistribution {
88    /// Data are distributed by time window first. The scanner will
89    /// return all data within one time window before moving to the next one.
90    TimeWindowed,
91    /// Data are organized by time-series first. The scanner will return
92    /// all data for one time-series before moving to the next one.
93    PerSeries,
94}
95
96#[derive(Default, Clone, Debug, PartialEq)]
97pub struct ScanRequest {
98    /// Optional projection information for the scan. `None` reads all root
99    /// columns.
100    pub projection_input: Option<ProjectionInput>,
101    /// Filters pushed down
102    pub filters: Vec<Expr>,
103    /// Expected output ordering. This is only a hint and isn't guaranteed.
104    pub output_ordering: Option<Vec<OrderOption>>,
105    /// limit can be used to reduce the amount scanned
106    /// from the datasource as a performance optimization.
107    /// If set, it contains the amount of rows needed by the caller,
108    /// The data source should return *at least* this number of rows if available.
109    pub limit: Option<usize>,
110    /// Optional hint to select rows from time-series.
111    pub series_row_selector: Option<TimeSeriesRowSelector>,
112    /// Optional constraint on the sequence number of the rows to read.
113    /// If set, only rows with a sequence number **lesser or equal** to this value
114    /// will be returned.
115    /// This is the effective memtable upper bound used by the scan, whether provided
116    /// explicitly or bound on scan open.
117    pub memtable_max_sequence: Option<SequenceNumber>,
118    /// Optional constraint on the minimal sequence number in the memtable.
119    /// If set, only the memtables that contain sequences **greater than** this value will be scanned
120    pub memtable_min_sequence: Option<SequenceNumber>,
121    /// Optional constraint on the minimal sequence number in the SST files.
122    /// If set, only the SST files that contain sequences greater than this value will be scanned.
123    pub sst_min_sequence: Option<SequenceNumber>,
124    /// Whether to bind the effective snapshot upper bound when opening the scan.
125    pub snapshot_on_scan: bool,
126    /// Optional hint for the distribution of time-series data.
127    pub distribution: Option<TimeSeriesDistribution>,
128    /// Optional hint for KNN vector search. When set, the scan should use
129    /// vector index to find the k nearest neighbors.
130    pub vector_search: Option<VectorSearchRequest>,
131}
132
133impl ScanRequest {
134    /// Returns the top-level projected column indices.
135    pub fn projection_indices(&self) -> Option<&[usize]> {
136        self.projection_input
137            .as_ref()
138            .map(|projection_input| projection_input.projection.as_slice())
139    }
140}
141
142impl Display for ScanRequest {
143    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144        enum Delimiter {
145            None,
146            Init,
147        }
148
149        impl Delimiter {
150            fn as_str(&mut self) -> &str {
151                match self {
152                    Delimiter::None => {
153                        *self = Delimiter::Init;
154                        ""
155                    }
156                    Delimiter::Init => ", ",
157                }
158            }
159        }
160
161        let mut delimiter = Delimiter::None;
162
163        write!(f, "ScanRequest {{ ")?;
164        if let Some(projection) = &self.projection_input {
165            write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
166        }
167        if !self.filters.is_empty() {
168            write!(
169                f,
170                "{}filters: [{}]",
171                delimiter.as_str(),
172                self.filters
173                    .iter()
174                    .map(|f| f.to_string())
175                    .collect::<Vec<_>>()
176                    .join(", ")
177            )?;
178        }
179        if let Some(output_ordering) = &self.output_ordering {
180            write!(
181                f,
182                "{}output_ordering: {:?}",
183                delimiter.as_str(),
184                output_ordering
185            )?;
186        }
187        if let Some(limit) = &self.limit {
188            write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
189        }
190        if let Some(series_row_selector) = &self.series_row_selector {
191            write!(
192                f,
193                "{}series_row_selector: {}",
194                delimiter.as_str(),
195                series_row_selector
196            )?;
197        }
198        if let Some(sequence) = &self.memtable_max_sequence {
199            write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
200        }
201        if let Some(sst_min_sequence) = &self.sst_min_sequence {
202            write!(
203                f,
204                "{}sst_min_sequence: {}",
205                delimiter.as_str(),
206                sst_min_sequence
207            )?;
208        }
209        if self.snapshot_on_scan {
210            write!(
211                f,
212                "{}snapshot_on_scan: {}",
213                delimiter.as_str(),
214                self.snapshot_on_scan
215            )?;
216        }
217        if let Some(distribution) = &self.distribution {
218            write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
219        }
220        if let Some(vector_search) = &self.vector_search {
221            write!(
222                f,
223                "{}vector_search: column_id={}, k={}, metric={}",
224                delimiter.as_str(),
225                vector_search.column_id,
226                vector_search.k,
227                vector_search.metric
228            )?;
229        }
230        write!(f, " }}")
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use datafusion_expr::{Operator, binary_expr, col, lit};
237
238    use super::*;
239
240    #[test]
241    fn test_display_scan_request() {
242        let request = ScanRequest {
243            ..Default::default()
244        };
245        assert_eq!(request.to_string(), "ScanRequest {  }");
246
247        let projection_input = Some(vec![1, 2].into());
248        let request = ScanRequest {
249            projection_input,
250            filters: vec![
251                binary_expr(col("i"), Operator::Gt, lit(1)),
252                binary_expr(col("s"), Operator::Eq, lit("x")),
253            ],
254            limit: Some(10),
255            ..Default::default()
256        };
257        assert_eq!(
258            request.to_string(),
259            r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
260        );
261
262        let request = ScanRequest {
263            filters: vec![
264                binary_expr(col("i"), Operator::Gt, lit(1)),
265                binary_expr(col("s"), Operator::Eq, lit("x")),
266            ],
267            limit: Some(10),
268            ..Default::default()
269        };
270        assert_eq!(
271            request.to_string(),
272            r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
273        );
274
275        let projection_input = Some(vec![1, 2].into());
276        let request = ScanRequest {
277            projection_input,
278            limit: Some(10),
279            ..Default::default()
280        };
281        assert_eq!(
282            request.to_string(),
283            "ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, limit: 10 }"
284        );
285
286        let projection_input = Some(ProjectionInput::new(vec![1, 2]).with_nested_paths(vec![
287            vec!["j".to_string(), "a".to_string(), "b".to_string()],
288            vec!["s".to_string(), "x".to_string()],
289        ]));
290        let request = ScanRequest {
291            projection_input,
292            limit: Some(10),
293            ..Default::default()
294        };
295        assert_eq!(
296            request.to_string(),
297            r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [["j", "a", "b"], ["s", "x"]] }, limit: 10 }"#
298        );
299
300        let request = ScanRequest {
301            snapshot_on_scan: true,
302            ..Default::default()
303        };
304        assert_eq!(
305            request.to_string(),
306            "ScanRequest { snapshot_on_scan: true }"
307        );
308    }
309}