Skip to main content

store_api/storage/
requests.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17
18use common_error::ext::BoxedError;
19use common_recordbatch::OrderOption;
20use datafusion_expr::expr::Expr;
21// Re-export vector types from datatypes to avoid duplication
22pub use datatypes::schema::{VectorDistanceMetric, VectorIndexEngineType};
23use datatypes::types::json_type::JsonNativeType;
24use itertools::Itertools;
25use strum::Display;
26
27use crate::storage::{ColumnId, ProjectionInput, SequenceNumber};
28
29/// A hint for KNN vector search.
30#[derive(Debug, Clone, PartialEq)]
31pub struct VectorSearchRequest {
32    /// Column ID of the vector column to search.
33    pub column_id: ColumnId,
34    /// The query vector to search for.
35    pub query_vector: Vec<f32>,
36    /// Number of nearest neighbors to return.
37    pub k: usize,
38    /// Distance metric to use (matches the index metric).
39    pub metric: VectorDistanceMetric,
40}
41
42/// Search results from vector index.
43#[derive(Debug, Clone, PartialEq)]
44pub struct VectorSearchMatches {
45    /// Keys (row offsets in the index).
46    pub keys: Vec<u64>,
47    /// Distances from the query vector.
48    pub distances: Vec<f32>,
49}
50
51/// Trait for vector index engines (HNSW implementations).
52///
53/// This trait defines the interface for pluggable vector index engines.
54/// Implementations (e.g., UsearchEngine) are provided by storage engines like mito2.
55pub trait VectorIndexEngine: Send + Sync {
56    /// Adds a vector with the given key.
57    fn add(&mut self, key: u64, vector: &[f32]) -> Result<(), BoxedError>;
58
59    /// Searches for k nearest neighbors.
60    fn search(&self, query: &[f32], k: usize) -> Result<VectorSearchMatches, BoxedError>;
61
62    /// Returns the serialized length.
63    fn serialized_length(&self) -> usize;
64
65    /// Serializes the index to a buffer.
66    fn save_to_buffer(&self, buffer: &mut [u8]) -> Result<(), BoxedError>;
67
68    /// Reserves capacity for vectors.
69    fn reserve(&mut self, capacity: usize) -> Result<(), BoxedError>;
70
71    /// Returns current size (number of vectors).
72    fn size(&self) -> usize;
73
74    /// Returns current capacity.
75    fn capacity(&self) -> usize;
76
77    /// Returns memory usage in bytes.
78    fn memory_usage(&self) -> usize;
79}
80
81/// A hint on how to select rows from a time-series.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
83pub enum TimeSeriesRowSelector {
84    /// Only keep the last row of each time-series.
85    LastRow,
86}
87
88/// A hint on how to distribute time-series data on the scan output.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
90pub enum TimeSeriesDistribution {
91    /// Data are distributed by time window first. The scanner will
92    /// return all data within one time window before moving to the next one.
93    TimeWindowed,
94    /// Data are organized by time-series first. The scanner will return
95    /// all data for one time-series before moving to the next one.
96    PerSeries,
97}
98
99#[derive(Default, Clone, Debug, PartialEq)]
100pub struct ScanRequest {
101    /// Optional projection information for the scan. `None` reads all root
102    /// columns.
103    pub projection_input: Option<ProjectionInput>,
104    /// Filters pushed down
105    pub filters: Vec<Expr>,
106    /// Expected output ordering. This is only a hint and isn't guaranteed.
107    pub output_ordering: Option<Vec<OrderOption>>,
108    /// limit can be used to reduce the amount scanned
109    /// from the datasource as a performance optimization.
110    /// If set, it contains the amount of rows needed by the caller,
111    /// The data source should return *at least* this number of rows if available.
112    pub limit: Option<usize>,
113    /// Optional hint to select rows from time-series.
114    pub series_row_selector: Option<TimeSeriesRowSelector>,
115    /// Optional constraint on the sequence number of the rows to read.
116    /// If set, only rows with a sequence number **lesser or equal** to this value
117    /// will be returned.
118    /// This is the effective memtable upper bound used by the scan, whether provided
119    /// explicitly or bound on scan open.
120    pub memtable_max_sequence: Option<SequenceNumber>,
121    /// Optional constraint on the minimal sequence number in the memtable.
122    /// If set, only the memtables that contain sequences **greater than** this value will be scanned
123    pub memtable_min_sequence: Option<SequenceNumber>,
124    /// Optional constraint on the minimal sequence number in the SST files.
125    /// If set, only the SST files that contain sequences greater than this value will be scanned.
126    pub sst_min_sequence: Option<SequenceNumber>,
127    /// Whether to skip all SST files.
128    /// This is stronger than `sst_min_sequence` and also skips SST files without sequence metadata.
129    pub skip_sst_files: bool,
130    /// Whether to bind the effective snapshot upper bound when opening the scan.
131    pub snapshot_on_scan: bool,
132    /// Optional hint for the distribution of time-series data.
133    pub distribution: Option<TimeSeriesDistribution>,
134    /// Optional hint for KNN vector search. When set, the scan should use
135    /// vector index to find the k nearest neighbors.
136    pub vector_search: Option<VectorSearchRequest>,
137    /// Optional hint from query-driven JSON type concretization.
138    pub json_type_hint: HashMap<String, JsonNativeType>,
139}
140
141impl ScanRequest {
142    /// Returns the top-level projected column indices.
143    pub fn projection_indices(&self) -> Option<&[usize]> {
144        self.projection_input
145            .as_ref()
146            .map(|projection_input| projection_input.projection.as_slice())
147    }
148}
149
150impl Display for ScanRequest {
151    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
152        enum Delimiter {
153            None,
154            Init,
155        }
156
157        impl Delimiter {
158            fn as_str(&mut self) -> &str {
159                match self {
160                    Delimiter::None => {
161                        *self = Delimiter::Init;
162                        ""
163                    }
164                    Delimiter::Init => ", ",
165                }
166            }
167        }
168
169        let mut delimiter = Delimiter::None;
170
171        write!(f, "ScanRequest {{ ")?;
172        if let Some(projection) = &self.projection_input {
173            write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
174        }
175        if !self.filters.is_empty() {
176            write!(
177                f,
178                "{}filters: [{}]",
179                delimiter.as_str(),
180                self.filters
181                    .iter()
182                    .map(|f| f.to_string())
183                    .collect::<Vec<_>>()
184                    .join(", ")
185            )?;
186        }
187        if let Some(output_ordering) = &self.output_ordering {
188            write!(
189                f,
190                "{}output_ordering: {:?}",
191                delimiter.as_str(),
192                output_ordering
193            )?;
194        }
195        if let Some(limit) = &self.limit {
196            write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
197        }
198        if let Some(series_row_selector) = &self.series_row_selector {
199            write!(
200                f,
201                "{}series_row_selector: {}",
202                delimiter.as_str(),
203                series_row_selector
204            )?;
205        }
206        if let Some(sequence) = &self.memtable_max_sequence {
207            write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
208        }
209        if let Some(sst_min_sequence) = &self.sst_min_sequence {
210            write!(
211                f,
212                "{}sst_min_sequence: {}",
213                delimiter.as_str(),
214                sst_min_sequence
215            )?;
216        }
217        if self.skip_sst_files {
218            write!(
219                f,
220                "{}skip_sst_files: {}",
221                delimiter.as_str(),
222                self.skip_sst_files
223            )?;
224        }
225        if self.snapshot_on_scan {
226            write!(
227                f,
228                "{}snapshot_on_scan: {}",
229                delimiter.as_str(),
230                self.snapshot_on_scan
231            )?;
232        }
233        if let Some(distribution) = &self.distribution {
234            write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
235        }
236        if let Some(vector_search) = &self.vector_search {
237            write!(
238                f,
239                "{}vector_search: column_id={}, k={}, metric={}",
240                delimiter.as_str(),
241                vector_search.column_id,
242                vector_search.k,
243                vector_search.metric
244            )?;
245        }
246        if !self.json_type_hint.is_empty() {
247            write!(
248                f,
249                "{}json_type_hint: {}",
250                delimiter.as_str(),
251                self.json_type_hint
252                    .iter()
253                    .map(|(column, json_type)| format!("({column}: {json_type})"))
254                    .join(", ")
255            )?;
256        }
257        write!(f, " }}")
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use datafusion_expr::{Operator, binary_expr, col, lit};
264
265    use super::*;
266
267    #[test]
268    fn test_display_scan_request() {
269        let request = ScanRequest {
270            ..Default::default()
271        };
272        assert_eq!(request.to_string(), "ScanRequest {  }");
273
274        let projection_input = Some(vec![1, 2].into());
275        let request = ScanRequest {
276            projection_input,
277            filters: vec![
278                binary_expr(col("i"), Operator::Gt, lit(1)),
279                binary_expr(col("s"), Operator::Eq, lit("x")),
280            ],
281            limit: Some(10),
282            ..Default::default()
283        };
284        assert_eq!(
285            request.to_string(),
286            r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
287        );
288
289        let request = ScanRequest {
290            filters: vec![
291                binary_expr(col("i"), Operator::Gt, lit(1)),
292                binary_expr(col("s"), Operator::Eq, lit("x")),
293            ],
294            limit: Some(10),
295            ..Default::default()
296        };
297        assert_eq!(
298            request.to_string(),
299            r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
300        );
301
302        let projection_input = Some(vec![1, 2].into());
303        let request = ScanRequest {
304            projection_input,
305            limit: Some(10),
306            ..Default::default()
307        };
308        assert_eq!(
309            request.to_string(),
310            "ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [] }, limit: 10 }"
311        );
312
313        let projection_input = Some(ProjectionInput::new(vec![1, 2]).with_nested_paths(vec![
314            vec!["j".to_string(), "a".to_string(), "b".to_string()],
315            vec!["s".to_string(), "x".to_string()],
316        ]));
317        let request = ScanRequest {
318            projection_input,
319            limit: Some(10),
320            ..Default::default()
321        };
322        assert_eq!(
323            request.to_string(),
324            r#"ScanRequest { projection: ProjectionInput { projection: [1, 2], nested_paths: [["j", "a", "b"], ["s", "x"]] }, limit: 10 }"#
325        );
326
327        let request = ScanRequest {
328            snapshot_on_scan: true,
329            ..Default::default()
330        };
331        assert_eq!(
332            request.to_string(),
333            "ScanRequest { snapshot_on_scan: true }"
334        );
335
336        let request = ScanRequest {
337            skip_sst_files: true,
338            ..Default::default()
339        };
340        assert_eq!(request.to_string(), "ScanRequest { skip_sst_files: true }");
341    }
342}