index/inverted_index/search/index_apply/
predicates_apply.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::size_of;
16
17use async_trait::async_trait;
18use greptime_proto::v1::index::InvertedIndexMetas;
19
20use crate::bitmap::Bitmap;
21use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
22use crate::inverted_index::format::reader::InvertedIndexReader;
23use crate::inverted_index::search::fst_apply::{
24    FstApplier, IntersectionFstApplier, KeysFstApplier,
25};
26use crate::inverted_index::search::fst_values_mapper::ParallelFstValuesMapper;
27use crate::inverted_index::search::index_apply::{
28    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
29};
30use crate::inverted_index::search::predicate::Predicate;
31
32type IndexName = String;
33
34/// `PredicatesIndexApplier` contains a collection of `FstApplier`s, each associated with an index name,
35/// to process and filter index data based on compiled predicates.
36pub struct PredicatesIndexApplier {
37    /// A list of `FstApplier`s, each associated with a specific index name
38    /// (e.g. a tag field uses its column name as index name)
39    fst_appliers: Vec<(IndexName, Box<dyn FstApplier>)>,
40}
41
42#[async_trait]
43impl IndexApplier for PredicatesIndexApplier {
44    /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
45    /// bitmaps obtained for each index to result in a final set of indices.
46    async fn apply<'a>(
47        &self,
48        context: SearchContext,
49        reader: &mut (dyn InvertedIndexReader + 'a),
50    ) -> Result<ApplyOutput> {
51        let metadata = reader.metadata().await?;
52        let mut output = ApplyOutput {
53            matched_segment_ids: Bitmap::new_bitvec(),
54            total_row_count: metadata.total_row_count as _,
55            segment_row_count: metadata.segment_row_count as _,
56        };
57
58        // TODO(zhongzc): optimize the order of applying to make it quicker to return empty.
59        let mut appliers = Vec::with_capacity(self.fst_appliers.len());
60        let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len());
61
62        for (name, fst_applier) in &self.fst_appliers {
63            let Some(meta) = metadata.metas.get(name) else {
64                match context.index_not_found_strategy {
65                    IndexNotFoundStrategy::ReturnEmpty => {
66                        return Ok(output);
67                    }
68                    IndexNotFoundStrategy::Ignore => {
69                        continue;
70                    }
71                    IndexNotFoundStrategy::ThrowError => {
72                        return IndexNotFoundSnafu { name }.fail();
73                    }
74                }
75            };
76            let fst_offset = meta.base_offset + meta.relative_fst_offset as u64;
77            let fst_size = meta.fst_size as u64;
78            appliers.push((fst_applier, meta));
79            fst_ranges.push(fst_offset..fst_offset + fst_size);
80        }
81
82        if fst_ranges.is_empty() {
83            output.matched_segment_ids = Self::bitmap_full_range(&metadata);
84            return Ok(output);
85        }
86
87        let fsts = reader.fst_vec(&fst_ranges).await?;
88        let value_and_meta_vec = fsts
89            .into_iter()
90            .zip(appliers)
91            .map(|(fst, (fst_applier, meta))| (fst_applier.apply(&fst), meta))
92            .collect::<Vec<_>>();
93
94        let mut mapper = ParallelFstValuesMapper::new(reader);
95        let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
96
97        let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
98        for bm in bm_vec {
99            if bm.count_ones() == 0 {
100                break;
101            }
102
103            bitmap.intersect(bm);
104        }
105
106        output.matched_segment_ids = bitmap;
107        Ok(output)
108    }
109
110    /// Returns the memory usage of the applier.
111    fn memory_usage(&self) -> usize {
112        let mut size = self.fst_appliers.capacity() * size_of::<(IndexName, Box<dyn FstApplier>)>();
113        for (name, fst_applier) in &self.fst_appliers {
114            size += name.capacity();
115            size += fst_applier.memory_usage();
116        }
117        size
118    }
119}
120
121impl PredicatesIndexApplier {
122    /// Constructs an instance of `PredicatesIndexApplier` based on a list of tag predicates.
123    /// Chooses an appropriate `FstApplier` for each index name based on the nature of its predicates.
124    pub fn try_from(mut predicates: Vec<(IndexName, Vec<Predicate>)>) -> Result<Self> {
125        let mut fst_appliers = Vec::with_capacity(predicates.len());
126
127        // InList predicates are applied first to benefit from higher selectivity.
128        let in_list_index = predicates
129            .iter_mut()
130            .partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
131        let mut iter = predicates.into_iter();
132        for _ in 0..in_list_index {
133            let (column_name, predicates) = iter.next().unwrap();
134            let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
135            fst_appliers.push((column_name, fst_applier));
136        }
137
138        for (column_name, predicates) in iter {
139            if predicates.is_empty() {
140                continue;
141            }
142            let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
143            fst_appliers.push((column_name, fst_applier));
144        }
145
146        Ok(PredicatesIndexApplier { fst_appliers })
147    }
148
149    /// Creates a `Bitmap` representing the full range of data in the index for initial scanning.
150    fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap {
151        let total_count = metadata.total_row_count;
152        let segment_count = metadata.segment_row_count;
153        let len = total_count.div_ceil(segment_count);
154        Bitmap::full_bitvec(len as _)
155    }
156}
157
158impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
159    type Error = crate::inverted_index::error::Error;
160    fn try_from(predicates: Vec<(String, Vec<Predicate>)>) -> Result<Self> {
161        Self::try_from(predicates)
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use std::collections::VecDeque;
168    use std::sync::Arc;
169
170    use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
171
172    use super::*;
173    use crate::bitmap::Bitmap;
174    use crate::inverted_index::FstMap;
175    use crate::inverted_index::error::Error;
176    use crate::inverted_index::format::reader::MockInvertedIndexReader;
177    use crate::inverted_index::search::fst_apply::MockFstApplier;
178
179    fn s(s: &'static str) -> String {
180        s.to_owned()
181    }
182
183    fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> Arc<InvertedIndexMetas> {
184        let mut metas = InvertedIndexMetas {
185            total_row_count: 8,
186            segment_row_count: 1,
187            ..Default::default()
188        };
189        for (tag, idx) in tags.into_iter() {
190            let meta = InvertedIndexMeta {
191                name: s(tag),
192                relative_fst_offset: idx,
193                bitmap_type: BitmapType::Roaring.into(),
194                ..Default::default()
195            };
196            metas.metas.insert(s(tag), meta);
197        }
198        Arc::new(metas)
199    }
200
201    fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
202        let mut mock_fst_applier = MockFstApplier::new();
203        mock_fst_applier
204            .expect_apply()
205            .returning(move |fst| fst.get(value).into_iter().collect());
206        Box::new(mock_fst_applier)
207    }
208
209    fn fst_value(offset: u32, size: u32) -> u64 {
210        bytemuck::cast::<_, u64>([offset, size])
211    }
212
213    #[tokio::test]
214    async fn test_index_applier_apply_get_key() {
215        // An index applier that point-gets "tag-0_value-0" on tag "tag-0"
216        let applier = PredicatesIndexApplier {
217            fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))],
218        };
219
220        // An index reader with a single tag "tag-0" and a corresponding value "tag-0_value-0"
221        let mut mock_reader = MockInvertedIndexReader::new();
222        mock_reader
223            .expect_metadata()
224            .returning(|| Ok(mock_metas([("tag-0", 0)])));
225        mock_reader.expect_fst_vec().returning(|_ranges| {
226            Ok(vec![
227                FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
228            ])
229        });
230
231        mock_reader.expect_bitmap_deque().returning(|arg| {
232            assert_eq!(arg.len(), 1);
233            let range = &arg[0].0;
234            let bitmap_type = arg[0].1;
235            assert_eq!(*range, 2..3);
236            assert_eq!(bitmap_type, BitmapType::Roaring);
237            Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
238                &[0b10101010],
239                bitmap_type,
240            )]))
241        });
242        let output = applier
243            .apply(SearchContext::default(), &mut mock_reader)
244            .await
245            .unwrap();
246        assert_eq!(
247            output.matched_segment_ids,
248            Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
249        );
250
251        // An index reader with a single tag "tag-0" but without value "tag-0_value-0"
252        let mut mock_reader = MockInvertedIndexReader::new();
253        mock_reader
254            .expect_metadata()
255            .returning(|| Ok(mock_metas([("tag-0", 0)])));
256        mock_reader.expect_fst_vec().returning(|_range| {
257            Ok(vec![
258                FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
259            ])
260        });
261        let output = applier
262            .apply(SearchContext::default(), &mut mock_reader)
263            .await
264            .unwrap();
265        assert_eq!(output.matched_segment_ids.count_ones(), 0);
266    }
267
268    #[tokio::test]
269    async fn test_index_applier_apply_intersection_with_two_tags() {
270        // An index applier that intersects "tag-0_value-0" on tag "tag-0" and "tag-1_value-a" on tag "tag-1"
271        let applier = PredicatesIndexApplier {
272            fst_appliers: vec![
273                (s("tag-0"), key_fst_applier("tag-0_value-0")),
274                (s("tag-1"), key_fst_applier("tag-1_value-a")),
275            ],
276        };
277
278        // An index reader with two tags "tag-0" and "tag-1" and respective values "tag-0_value-0" and "tag-1_value-a"
279        let mut mock_reader = MockInvertedIndexReader::new();
280        mock_reader
281            .expect_metadata()
282            .returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
283        mock_reader.expect_fst_vec().returning(|ranges| {
284            let mut output = vec![];
285            for range in ranges {
286                match range.start {
287                    0 => output
288                        .push(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()),
289                    1 => output
290                        .push(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()),
291                    _ => unreachable!(),
292                }
293            }
294            Ok(output)
295        });
296        mock_reader.expect_bitmap_deque().returning(|ranges| {
297            let mut output = VecDeque::new();
298            for (range, bitmap_type) in ranges {
299                let offset = range.start;
300                let size = range.end - range.start;
301                match (offset, size, bitmap_type) {
302                    (1, 1, BitmapType::Roaring) => {
303                        output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
304                    }
305                    (2, 1, BitmapType::Roaring) => {
306                        output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
307                    }
308                    _ => unreachable!(),
309                }
310            }
311
312            Ok(output)
313        });
314
315        let output = applier
316            .apply(SearchContext::default(), &mut mock_reader)
317            .await
318            .unwrap();
319        assert_eq!(
320            output.matched_segment_ids,
321            Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring)
322        );
323    }
324
325    #[tokio::test]
326    async fn test_index_applier_without_predicates() {
327        let applier = PredicatesIndexApplier {
328            fst_appliers: vec![],
329        };
330
331        let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
332        mock_reader
333            .expect_metadata()
334            .returning(|| Ok(mock_metas([("tag-0", 0)])));
335
336        let output = applier
337            .apply(SearchContext::default(), &mut mock_reader)
338            .await
339            .unwrap();
340        assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
341    }
342
343    #[tokio::test]
344    async fn test_index_applier_with_empty_index() {
345        let mut mock_reader = MockInvertedIndexReader::new();
346        mock_reader.expect_metadata().returning(move || {
347            Ok(Arc::new(InvertedIndexMetas {
348                total_row_count: 0, // No rows
349                segment_row_count: 1,
350                ..Default::default()
351            }))
352        });
353
354        let mut mock_fst_applier = MockFstApplier::new();
355        mock_fst_applier.expect_apply().never();
356
357        let applier = PredicatesIndexApplier {
358            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
359        };
360
361        let output = applier
362            .apply(SearchContext::default(), &mut mock_reader)
363            .await
364            .unwrap();
365        assert!(output.matched_segment_ids.is_empty());
366    }
367
368    #[tokio::test]
369    async fn test_index_applier_with_nonexistent_index() {
370        let mut mock_reader = MockInvertedIndexReader::new();
371        mock_reader
372            .expect_metadata()
373            .returning(|| Ok(mock_metas(vec![])));
374
375        let mut mock_fst_applier = MockFstApplier::new();
376        mock_fst_applier.expect_apply().never();
377
378        let applier = PredicatesIndexApplier {
379            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
380        };
381
382        let result = applier
383            .apply(
384                SearchContext {
385                    index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
386                },
387                &mut mock_reader,
388            )
389            .await;
390        assert!(matches!(result, Err(Error::IndexNotFound { .. })));
391
392        let output = applier
393            .apply(
394                SearchContext {
395                    index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
396                },
397                &mut mock_reader,
398            )
399            .await
400            .unwrap();
401        assert!(output.matched_segment_ids.is_empty());
402
403        let output = applier
404            .apply(
405                SearchContext {
406                    index_not_found_strategy: IndexNotFoundStrategy::Ignore,
407                },
408                &mut mock_reader,
409            )
410            .await
411            .unwrap();
412        assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8));
413    }
414
415    #[test]
416    fn test_index_applier_memory_usage() {
417        let mut mock_fst_applier = MockFstApplier::new();
418        mock_fst_applier.expect_memory_usage().returning(|| 100);
419
420        let applier = PredicatesIndexApplier {
421            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
422        };
423
424        assert_eq!(
425            applier.memory_usage(),
426            size_of::<(IndexName, Box<dyn FstApplier>)>() + 5 + 100
427        );
428    }
429}