index/inverted_index/search/index_apply/
predicates_apply.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::size_of;
16
17use async_trait::async_trait;
18use greptime_proto::v1::index::InvertedIndexMetas;
19
20use crate::bitmap::Bitmap;
21use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
22use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
23use crate::inverted_index::search::fst_apply::{
24    FstApplier, IntersectionFstApplier, KeysFstApplier,
25};
26use crate::inverted_index::search::fst_values_mapper::ParallelFstValuesMapper;
27use crate::inverted_index::search::index_apply::{
28    ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
29};
30use crate::inverted_index::search::predicate::Predicate;
31
32type IndexName = String;
33
34/// `PredicatesIndexApplier` contains a collection of `FstApplier`s, each associated with an index name,
35/// to process and filter index data based on compiled predicates.
36pub struct PredicatesIndexApplier {
37    /// A list of `FstApplier`s, each associated with a specific index name
38    /// (e.g. a tag field uses its column name as index name)
39    fst_appliers: Vec<(IndexName, Box<dyn FstApplier>)>,
40}
41
42#[async_trait]
43impl IndexApplier for PredicatesIndexApplier {
44    /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
45    /// bitmaps obtained for each index to result in a final set of indices.
46    async fn apply<'a, 'b>(
47        &self,
48        context: SearchContext,
49        reader: &mut (dyn InvertedIndexReader + 'a),
50        metrics: Option<&'b mut InvertedIndexReadMetrics>,
51    ) -> Result<ApplyOutput> {
52        let mut metrics = metrics;
53        let metadata = reader.metadata(metrics.as_deref_mut()).await?;
54        let mut output = ApplyOutput {
55            matched_segment_ids: Bitmap::new_bitvec(),
56            total_row_count: metadata.total_row_count as _,
57            segment_row_count: metadata.segment_row_count as _,
58        };
59
60        // TODO(zhongzc): optimize the order of applying to make it quicker to return empty.
61        let mut appliers = Vec::with_capacity(self.fst_appliers.len());
62        let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len());
63
64        for (name, fst_applier) in &self.fst_appliers {
65            let Some(meta) = metadata.metas.get(name) else {
66                match context.index_not_found_strategy {
67                    IndexNotFoundStrategy::ReturnEmpty => {
68                        return Ok(output);
69                    }
70                    IndexNotFoundStrategy::Ignore => {
71                        continue;
72                    }
73                    IndexNotFoundStrategy::ThrowError => {
74                        return IndexNotFoundSnafu { name }.fail();
75                    }
76                }
77            };
78            let fst_offset = meta.base_offset + meta.relative_fst_offset as u64;
79            let fst_size = meta.fst_size as u64;
80            appliers.push((fst_applier, meta));
81            fst_ranges.push(fst_offset..fst_offset + fst_size);
82        }
83
84        if fst_ranges.is_empty() {
85            output.matched_segment_ids = Self::bitmap_full_range(&metadata);
86            return Ok(output);
87        }
88
89        let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?;
90        let value_and_meta_vec = fsts
91            .into_iter()
92            .zip(appliers)
93            .map(|(fst, (fst_applier, meta))| (fst_applier.apply(&fst), meta))
94            .collect::<Vec<_>>();
95
96        let mut mapper = ParallelFstValuesMapper::new(reader);
97        let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec, metrics).await?;
98
99        let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
100        for bm in bm_vec {
101            if bm.count_ones() == 0 {
102                break;
103            }
104
105            bitmap.intersect(bm);
106        }
107
108        output.matched_segment_ids = bitmap;
109        Ok(output)
110    }
111
112    /// Returns the memory usage of the applier.
113    fn memory_usage(&self) -> usize {
114        let mut size = self.fst_appliers.capacity() * size_of::<(IndexName, Box<dyn FstApplier>)>();
115        for (name, fst_applier) in &self.fst_appliers {
116            size += name.capacity();
117            size += fst_applier.memory_usage();
118        }
119        size
120    }
121}
122
123impl PredicatesIndexApplier {
124    /// Constructs an instance of `PredicatesIndexApplier` based on a list of tag predicates.
125    /// Chooses an appropriate `FstApplier` for each index name based on the nature of its predicates.
126    pub fn try_from(mut predicates: Vec<(IndexName, Vec<Predicate>)>) -> Result<Self> {
127        let mut fst_appliers = Vec::with_capacity(predicates.len());
128
129        // InList predicates are applied first to benefit from higher selectivity.
130        let in_list_index = predicates
131            .iter_mut()
132            .partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
133        let mut iter = predicates.into_iter();
134        for _ in 0..in_list_index {
135            let (column_name, predicates) = iter.next().unwrap();
136            let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
137            fst_appliers.push((column_name, fst_applier));
138        }
139
140        for (column_name, predicates) in iter {
141            if predicates.is_empty() {
142                continue;
143            }
144            let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
145            fst_appliers.push((column_name, fst_applier));
146        }
147
148        Ok(PredicatesIndexApplier { fst_appliers })
149    }
150
151    /// Creates a `Bitmap` representing the full range of data in the index for initial scanning.
152    fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap {
153        let total_count = metadata.total_row_count;
154        let segment_count = metadata.segment_row_count;
155        let len = total_count.div_ceil(segment_count);
156        Bitmap::full_bitvec(len as _)
157    }
158}
159
160impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
161    type Error = crate::inverted_index::error::Error;
162    fn try_from(predicates: Vec<(String, Vec<Predicate>)>) -> Result<Self> {
163        Self::try_from(predicates)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use std::collections::VecDeque;
170    use std::sync::Arc;
171
172    use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
173
174    use super::*;
175    use crate::bitmap::Bitmap;
176    use crate::inverted_index::FstMap;
177    use crate::inverted_index::error::Error;
178    use crate::inverted_index::format::reader::MockInvertedIndexReader;
179    use crate::inverted_index::search::fst_apply::MockFstApplier;
180
181    fn s(s: &'static str) -> String {
182        s.to_owned()
183    }
184
185    fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> Arc<InvertedIndexMetas> {
186        let mut metas = InvertedIndexMetas {
187            total_row_count: 8,
188            segment_row_count: 1,
189            ..Default::default()
190        };
191        for (tag, idx) in tags.into_iter() {
192            let meta = InvertedIndexMeta {
193                name: s(tag),
194                relative_fst_offset: idx,
195                bitmap_type: BitmapType::Roaring.into(),
196                ..Default::default()
197            };
198            metas.metas.insert(s(tag), meta);
199        }
200        Arc::new(metas)
201    }
202
203    fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
204        let mut mock_fst_applier = MockFstApplier::new();
205        mock_fst_applier
206            .expect_apply()
207            .returning(move |fst| fst.get(value).into_iter().collect());
208        Box::new(mock_fst_applier)
209    }
210
211    fn fst_value(offset: u32, size: u32) -> u64 {
212        bytemuck::cast::<_, u64>([offset, size])
213    }
214
215    #[tokio::test]
216    async fn test_index_applier_apply_get_key() {
217        // An index applier that point-gets "tag-0_value-0" on tag "tag-0"
218        let applier = PredicatesIndexApplier {
219            fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))],
220        };
221
222        // An index reader with a single tag "tag-0" and a corresponding value "tag-0_value-0"
223        let mut mock_reader = MockInvertedIndexReader::new();
224        mock_reader
225            .expect_metadata()
226            .returning(|_| Ok(mock_metas([("tag-0", 0)])));
227        mock_reader.expect_fst_vec().returning(|_ranges, _metrics| {
228            Ok(vec![
229                FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
230            ])
231        });
232
233        mock_reader
234            .expect_bitmap_deque()
235            .returning(|arg, _metrics| {
236                assert_eq!(arg.len(), 1);
237                let range = &arg[0].0;
238                let bitmap_type = arg[0].1;
239                assert_eq!(*range, 2..3);
240                assert_eq!(bitmap_type, BitmapType::Roaring);
241                Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
242                    &[0b10101010],
243                    bitmap_type,
244                )]))
245            });
246        let output = applier
247            .apply(SearchContext::default(), &mut mock_reader, None)
248            .await
249            .unwrap();
250        assert_eq!(
251            output.matched_segment_ids,
252            Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
253        );
254
255        // An index reader with a single tag "tag-0" but without value "tag-0_value-0"
256        let mut mock_reader = MockInvertedIndexReader::new();
257        mock_reader
258            .expect_metadata()
259            .returning(|_| Ok(mock_metas([("tag-0", 0)])));
260        mock_reader.expect_fst_vec().returning(|_range, _metrics| {
261            Ok(vec![
262                FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
263            ])
264        });
265        let output = applier
266            .apply(SearchContext::default(), &mut mock_reader, None)
267            .await
268            .unwrap();
269        assert_eq!(output.matched_segment_ids.count_ones(), 0);
270    }
271
272    #[tokio::test]
273    async fn test_index_applier_apply_intersection_with_two_tags() {
274        // An index applier that intersects "tag-0_value-0" on tag "tag-0" and "tag-1_value-a" on tag "tag-1"
275        let applier = PredicatesIndexApplier {
276            fst_appliers: vec![
277                (s("tag-0"), key_fst_applier("tag-0_value-0")),
278                (s("tag-1"), key_fst_applier("tag-1_value-a")),
279            ],
280        };
281
282        // An index reader with two tags "tag-0" and "tag-1" and respective values "tag-0_value-0" and "tag-1_value-a"
283        let mut mock_reader = MockInvertedIndexReader::new();
284        mock_reader
285            .expect_metadata()
286            .returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
287        mock_reader.expect_fst_vec().returning(|ranges, _metrics| {
288            let mut output = vec![];
289            for range in ranges {
290                match range.start {
291                    0 => output
292                        .push(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()),
293                    1 => output
294                        .push(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()),
295                    _ => unreachable!(),
296                }
297            }
298            Ok(output)
299        });
300        mock_reader
301            .expect_bitmap_deque()
302            .returning(|ranges, _metrics| {
303                let mut output = VecDeque::new();
304                for (range, bitmap_type) in ranges {
305                    let offset = range.start;
306                    let size = range.end - range.start;
307                    match (offset, size, bitmap_type) {
308                        (1, 1, BitmapType::Roaring) => {
309                            output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
310                        }
311                        (2, 1, BitmapType::Roaring) => {
312                            output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
313                        }
314                        _ => unreachable!(),
315                    }
316                }
317
318                Ok(output)
319            });
320
321        let output = applier
322            .apply(SearchContext::default(), &mut mock_reader, None)
323            .await
324            .unwrap();
325        assert_eq!(
326            output.matched_segment_ids,
327            Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring)
328        );
329    }
330
331    #[tokio::test]
332    async fn test_index_applier_without_predicates() {
333        let applier = PredicatesIndexApplier {
334            fst_appliers: vec![],
335        };
336
337        let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
338        mock_reader
339            .expect_metadata()
340            .returning(|_| Ok(mock_metas([("tag-0", 0)])));
341
342        let output = applier
343            .apply(SearchContext::default(), &mut mock_reader, None)
344            .await
345            .unwrap();
346        assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
347    }
348
349    #[tokio::test]
350    async fn test_index_applier_with_empty_index() {
351        let mut mock_reader = MockInvertedIndexReader::new();
352        mock_reader.expect_metadata().returning(move |_| {
353            Ok(Arc::new(InvertedIndexMetas {
354                total_row_count: 0, // No rows
355                segment_row_count: 1,
356                ..Default::default()
357            }))
358        });
359
360        let mut mock_fst_applier = MockFstApplier::new();
361        mock_fst_applier.expect_apply().never();
362
363        let applier = PredicatesIndexApplier {
364            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
365        };
366
367        let output = applier
368            .apply(SearchContext::default(), &mut mock_reader, None)
369            .await
370            .unwrap();
371        assert!(output.matched_segment_ids.is_empty());
372    }
373
374    #[tokio::test]
375    async fn test_index_applier_with_nonexistent_index() {
376        let mut mock_reader = MockInvertedIndexReader::new();
377        mock_reader
378            .expect_metadata()
379            .returning(|_| Ok(mock_metas(vec![])));
380
381        let mut mock_fst_applier = MockFstApplier::new();
382        mock_fst_applier.expect_apply().never();
383
384        let applier = PredicatesIndexApplier {
385            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
386        };
387
388        let result = applier
389            .apply(
390                SearchContext {
391                    index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
392                },
393                &mut mock_reader,
394                None,
395            )
396            .await;
397        assert!(matches!(result, Err(Error::IndexNotFound { .. })));
398
399        let output = applier
400            .apply(
401                SearchContext {
402                    index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
403                },
404                &mut mock_reader,
405                None,
406            )
407            .await
408            .unwrap();
409        assert!(output.matched_segment_ids.is_empty());
410
411        let output = applier
412            .apply(
413                SearchContext {
414                    index_not_found_strategy: IndexNotFoundStrategy::Ignore,
415                },
416                &mut mock_reader,
417                None,
418            )
419            .await
420            .unwrap();
421        assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8));
422    }
423
424    #[test]
425    fn test_index_applier_memory_usage() {
426        let mut mock_fst_applier = MockFstApplier::new();
427        mock_fst_applier.expect_memory_usage().returning(|| 100);
428
429        let applier = PredicatesIndexApplier {
430            fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
431        };
432
433        assert_eq!(
434            applier.memory_usage(),
435            size_of::<(IndexName, Box<dyn FstApplier>)>() + 5 + 100
436        );
437    }
438}