index/inverted_index/search/index_apply/
predicates_apply.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::size_of;
16
17use async_trait::async_trait;
18use greptime_proto::v1::index::InvertedIndexMetas;
19
20use crate::bitmap::Bitmap;
21use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
22use crate::inverted_index::format::reader::InvertedIndexReader;
23use crate::inverted_index::search::fst_apply::{
24    FstApplier, IntersectionFstApplier, KeysFstApplier,
25};
26use crate::inverted_index::search::fst_values_mapper::ParallelFstValuesMapper;
27use crate::inverted_index::search::index_apply::{
28    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
29};
30use crate::inverted_index::search::predicate::Predicate;
31
32type IndexName = String;
33
34/// `PredicatesIndexApplier` contains a collection of `FstApplier`s, each associated with an index name,
35/// to process and filter index data based on compiled predicates.
36pub struct PredicatesIndexApplier {
37    /// A list of `FstApplier`s, each associated with a specific index name
38    /// (e.g. a tag field uses its column name as index name)
39    fst_appliers: Vec<(IndexName, Box<dyn FstApplier>)>,
40}
41
42#[async_trait]
43impl IndexApplier for PredicatesIndexApplier {
44    /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
45    /// bitmaps obtained for each index to result in a final set of indices.
46    async fn apply<'a>(
47        &self,
48        context: SearchContext,
49        reader: &mut (dyn InvertedIndexReader + 'a),
50    ) -> Result<ApplyOutput> {
51        let metadata = reader.metadata().await?;
52        let mut output = ApplyOutput {
53            matched_segment_ids: Bitmap::new_bitvec(),
54            total_row_count: metadata.total_row_count as _,
55            segment_row_count: metadata.segment_row_count as _,
56        };
57
58        // TODO(zhongzc): optimize the order of applying to make it quicker to return empty.
59        let mut appliers = Vec::with_capacity(self.fst_appliers.len());
60        let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len());
61
62        for (name, fst_applier) in &self.fst_appliers {
63            let Some(meta) = metadata.metas.get(name) else {
64                match context.index_not_found_strategy {
65                    IndexNotFoundStrategy::ReturnEmpty => {
66                        return Ok(output);
67                    }
68                    IndexNotFoundStrategy::Ignore => {
69                        continue;
70                    }
71                    IndexNotFoundStrategy::ThrowError => {
72                        return IndexNotFoundSnafu { name }.fail();
73                    }
74                }
75            };
76            let fst_offset = meta.base_offset + meta.relative_fst_offset as u64;
77            let fst_size = meta.fst_size as u64;
78            appliers.push((fst_applier, meta));
79            fst_ranges.push(fst_offset..fst_offset + fst_size);
80        }
81
82        if fst_ranges.is_empty() {
83            output.matched_segment_ids = Self::bitmap_full_range(&metadata);
84            return Ok(output);
85        }
86
87        let fsts = reader.fst_vec(&fst_ranges).await?;
88        let value_and_meta_vec = fsts
89            .into_iter()
90            .zip(appliers)
91            .map(|(fst, (fst_applier, meta))| (fst_applier.apply(&fst), meta))
92            .collect::<Vec<_>>();
93
94        let mut mapper = ParallelFstValuesMapper::new(reader);
95        let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
96
97        let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
98        for bm in bm_vec {
99            if bm.count_ones() == 0 {
100                break;
101            }
102
103            bitmap.intersect(bm);
104        }
105
106        output.matched_segment_ids = bitmap;
107        Ok(output)
108    }
109
110    /// Returns the memory usage of the applier.
111    fn memory_usage(&self) -> usize {
112        let mut size = self.fst_appliers.capacity() * size_of::<(IndexName, Box<dyn FstApplier>)>();
113        for (name, fst_applier) in &self.fst_appliers {
114            size += name.capacity();
115            size += fst_applier.memory_usage();
116        }
117        size
118    }
119}
120
121impl PredicatesIndexApplier {
122    /// Constructs an instance of `PredicatesIndexApplier` based on a list of tag predicates.
123    /// Chooses an appropriate `FstApplier` for each index name based on the nature of its predicates.
124    pub fn try_from(mut predicates: Vec<(IndexName, Vec<Predicate>)>) -> Result<Self> {
125        let mut fst_appliers = Vec::with_capacity(predicates.len());
126
127        // InList predicates are applied first to benefit from higher selectivity.
128        let in_list_index = predicates
129            .iter_mut()
130            .partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
131        let mut iter = predicates.into_iter();
132        for _ in 0..in_list_index {
133            let (column_name, predicates) = iter.next().unwrap();
134            let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
135            fst_appliers.push((column_name, fst_applier));
136        }
137
138        for (column_name, predicates) in iter {
139            if predicates.is_empty() {
140                continue;
141            }
142            let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
143            fst_appliers.push((column_name, fst_applier));
144        }
145
146        Ok(PredicatesIndexApplier { fst_appliers })
147    }
148
149    /// Creates a `Bitmap` representing the full range of data in the index for initial scanning.
150    fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap {
151        let total_count = metadata.total_row_count;
152        let segment_count = metadata.segment_row_count;
153        let len = total_count.div_ceil(segment_count);
154        Bitmap::full_bitvec(len as _)
155    }
156}
157
158impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
159    type Error = crate::inverted_index::error::Error;
160    fn try_from(predicates: Vec<(String, Vec<Predicate>)>) -> Result<Self> {
161        Self::try_from(predicates)
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use std::collections::VecDeque;
168    use std::sync::Arc;
169
170    use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
171
172    use super::*;
173    use crate::bitmap::Bitmap;
174    use crate::inverted_index::error::Error;
175    use crate::inverted_index::format::reader::MockInvertedIndexReader;
176    use crate::inverted_index::search::fst_apply::MockFstApplier;
177    use crate::inverted_index::FstMap;
178
179    fn s(s: &'static str) -> String {
180        s.to_owned()
181    }
182
183    fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> Arc<InvertedIndexMetas> {
184        let mut metas = InvertedIndexMetas {
185            total_row_count: 8,
186            segment_row_count: 1,
187            ..Default::default()
188        };
189        for (tag, idx) in tags.into_iter() {
190            let meta = InvertedIndexMeta {
191                name: s(tag),
192                relative_fst_offset: idx,
193                bitmap_type: BitmapType::Roaring.into(),
194                ..Default::default()
195            };
196            metas.metas.insert(s(tag), meta);
197        }
198        Arc::new(metas)
199    }
200
201    fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
202        let mut mock_fst_applier = MockFstApplier::new();
203        mock_fst_applier
204            .expect_apply()
205            .returning(move |fst| fst.get(value).into_iter().collect());
206        Box::new(mock_fst_applier)
207    }
208
209    fn fst_value(offset: u32, size: u32) -> u64 {
210        bytemuck::cast::<_, u64>([offset, size])
211    }
212
213    #[tokio::test]
214    async fn test_index_applier_apply_get_key() {
215        // An index applier that point-gets "tag-0_value-0" on tag "tag-0"
216        let applier = PredicatesIndexApplier {
217            fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))],
218        };
219
220        // An index reader with a single tag "tag-0" and a corresponding value "tag-0_value-0"
221        let mut mock_reader = MockInvertedIndexReader::new();
222        mock_reader
223            .expect_metadata()
224            .returning(|| Ok(mock_metas([("tag-0", 0)])));
225        mock_reader.expect_fst_vec().returning(|_ranges| {
226            Ok(vec![FstMap::from_iter([(
227                b"tag-0_value-0",
228                fst_value(2, 1),
229            )])
230            .unwrap()])
231        });
232
233        mock_reader.expect_bitmap_deque().returning(|arg| {
234            assert_eq!(arg.len(), 1);
235            let range = &arg[0].0;
236            let bitmap_type = arg[0].1;
237            assert_eq!(*range, 2..3);
238            assert_eq!(bitmap_type, BitmapType::Roaring);
239            Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
240                &[0b10101010],
241                bitmap_type,
242            )]))
243        });
244        let output = applier
245            .apply(SearchContext::default(), &mut mock_reader)
246            .await
247            .unwrap();
248        assert_eq!(
249            output.matched_segment_ids,
250            Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
251        );
252
253        // An index reader with a single tag "tag-0" but without value "tag-0_value-0"
254        let mut mock_reader = MockInvertedIndexReader::new();
255        mock_reader
256            .expect_metadata()
257            .returning(|| Ok(mock_metas([("tag-0", 0)])));
258        mock_reader.expect_fst_vec().returning(|_range| {
259            Ok(vec![FstMap::from_iter([(
260                b"tag-0_value-1",
261                fst_value(2, 1),
262            )])
263            .unwrap()])
264        });
265        let output = applier
266            .apply(SearchContext::default(), &mut mock_reader)
267            .await
268            .unwrap();
269        assert_eq!(output.matched_segment_ids.count_ones(), 0);
270    }
271
272    #[tokio::test]
273    async fn test_index_applier_apply_intersection_with_two_tags() {
274        // An index applier that intersects "tag-0_value-0" on tag "tag-0" and "tag-1_value-a" on tag "tag-1"
275        let applier = PredicatesIndexApplier {
276            fst_appliers: vec![
277                (s("tag-0"), key_fst_applier("tag-0_value-0")),
278                (s("tag-1"), key_fst_applier("tag-1_value-a")),
279            ],
280        };
281
282        // An index reader with two tags "tag-0" and "tag-1" and respective values "tag-0_value-0" and "tag-1_value-a"
283        let mut mock_reader = MockInvertedIndexReader::new();
284        mock_reader
285            .expect_metadata()
286            .returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
287        mock_reader.expect_fst_vec().returning(|ranges| {
288            let mut output = vec![];
289            for range in ranges {
290                match range.start {
291                    0 => output
292                        .push(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()),
293                    1 => output
294                        .push(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()),
295                    _ => unreachable!(),
296                }
297            }
298            Ok(output)
299        });
300        mock_reader.expect_bitmap_deque().returning(|ranges| {
301            let mut output = VecDeque::new();
302            for (range, bitmap_type) in ranges {
303                let offset = range.start;
304                let size = range.end - range.start;
305                match (offset, size, bitmap_type) {
306                    (1, 1, BitmapType::Roaring) => {
307                        output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
308                    }
309                    (2, 1, BitmapType::Roaring) => {
310                        output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
311                    }
312                    _ => unreachable!(),
313                }
314            }
315
316            Ok(output)
317        });
318
319        let output = applier
320            .apply(SearchContext::default(), &mut mock_reader)
321            .await
322            .unwrap();
323        assert_eq!(
324            output.matched_segment_ids,
325            Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring)
326        );
327    }
328
329    #[tokio::test]
330    async fn test_index_applier_without_predicates() {
331        let applier = PredicatesIndexApplier {
332            fst_appliers: vec![],
333        };
334
335        let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
336        mock_reader
337            .expect_metadata()
338            .returning(|| Ok(mock_metas([("tag-0", 0)])));
339
340        let output = applier
341            .apply(SearchContext::default(), &mut mock_reader)
342            .await
343            .unwrap();
344        assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
345    }
346
347    #[tokio::test]
348    async fn test_index_applier_with_empty_index() {
349        let mut mock_reader = MockInvertedIndexReader::new();
350        mock_reader.expect_metadata().returning(move || {
351            Ok(Arc::new(InvertedIndexMetas {
352                total_row_count: 0, // No rows
353                segment_row_count: 1,
354                ..Default::default()
355            }))
356        });
357
358        let mut mock_fst_applier = MockFstApplier::new();
359        mock_fst_applier.expect_apply().never();
360
361        let applier = PredicatesIndexApplier {
362            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
363        };
364
365        let output = applier
366            .apply(SearchContext::default(), &mut mock_reader)
367            .await
368            .unwrap();
369        assert!(output.matched_segment_ids.is_empty());
370    }
371
372    #[tokio::test]
373    async fn test_index_applier_with_nonexistent_index() {
374        let mut mock_reader = MockInvertedIndexReader::new();
375        mock_reader
376            .expect_metadata()
377            .returning(|| Ok(mock_metas(vec![])));
378
379        let mut mock_fst_applier = MockFstApplier::new();
380        mock_fst_applier.expect_apply().never();
381
382        let applier = PredicatesIndexApplier {
383            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
384        };
385
386        let result = applier
387            .apply(
388                SearchContext {
389                    index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
390                },
391                &mut mock_reader,
392            )
393            .await;
394        assert!(matches!(result, Err(Error::IndexNotFound { .. })));
395
396        let output = applier
397            .apply(
398                SearchContext {
399                    index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
400                },
401                &mut mock_reader,
402            )
403            .await
404            .unwrap();
405        assert!(output.matched_segment_ids.is_empty());
406
407        let output = applier
408            .apply(
409                SearchContext {
410                    index_not_found_strategy: IndexNotFoundStrategy::Ignore,
411                },
412                &mut mock_reader,
413            )
414            .await
415            .unwrap();
416        assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8));
417    }
418
419    #[test]
420    fn test_index_applier_memory_usage() {
421        let mut mock_fst_applier = MockFstApplier::new();
422        mock_fst_applier.expect_memory_usage().returning(|| 100);
423
424        let applier = PredicatesIndexApplier {
425            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
426        };
427
428        assert_eq!(
429            applier.memory_usage(),
430            size_of::<(IndexName, Box<dyn FstApplier>)>() + 5 + 100
431        );
432    }
433}