index/inverted_index/search/index_apply/
predicates_apply.rs1use std::mem::size_of;
16
17use async_trait::async_trait;
18use greptime_proto::v1::index::InvertedIndexMetas;
19
20use crate::bitmap::Bitmap;
21use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
22use crate::inverted_index::format::reader::InvertedIndexReader;
23use crate::inverted_index::search::fst_apply::{
24 FstApplier, IntersectionFstApplier, KeysFstApplier,
25};
26use crate::inverted_index::search::fst_values_mapper::ParallelFstValuesMapper;
27use crate::inverted_index::search::index_apply::{
28 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
29};
30use crate::inverted_index::search::predicate::Predicate;
31
32type IndexName = String;
33
34pub struct PredicatesIndexApplier {
37 fst_appliers: Vec<(IndexName, Box<dyn FstApplier>)>,
40}
41
42#[async_trait]
43impl IndexApplier for PredicatesIndexApplier {
44 async fn apply<'a>(
47 &self,
48 context: SearchContext,
49 reader: &mut (dyn InvertedIndexReader + 'a),
50 ) -> Result<ApplyOutput> {
51 let metadata = reader.metadata().await?;
52 let mut output = ApplyOutput {
53 matched_segment_ids: Bitmap::new_bitvec(),
54 total_row_count: metadata.total_row_count as _,
55 segment_row_count: metadata.segment_row_count as _,
56 };
57
58 let mut appliers = Vec::with_capacity(self.fst_appliers.len());
60 let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len());
61
62 for (name, fst_applier) in &self.fst_appliers {
63 let Some(meta) = metadata.metas.get(name) else {
64 match context.index_not_found_strategy {
65 IndexNotFoundStrategy::ReturnEmpty => {
66 return Ok(output);
67 }
68 IndexNotFoundStrategy::Ignore => {
69 continue;
70 }
71 IndexNotFoundStrategy::ThrowError => {
72 return IndexNotFoundSnafu { name }.fail();
73 }
74 }
75 };
76 let fst_offset = meta.base_offset + meta.relative_fst_offset as u64;
77 let fst_size = meta.fst_size as u64;
78 appliers.push((fst_applier, meta));
79 fst_ranges.push(fst_offset..fst_offset + fst_size);
80 }
81
82 if fst_ranges.is_empty() {
83 output.matched_segment_ids = Self::bitmap_full_range(&metadata);
84 return Ok(output);
85 }
86
87 let fsts = reader.fst_vec(&fst_ranges).await?;
88 let value_and_meta_vec = fsts
89 .into_iter()
90 .zip(appliers)
91 .map(|(fst, (fst_applier, meta))| (fst_applier.apply(&fst), meta))
92 .collect::<Vec<_>>();
93
94 let mut mapper = ParallelFstValuesMapper::new(reader);
95 let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
96
97 let mut bitmap = bm_vec.pop().unwrap(); for bm in bm_vec {
99 if bm.count_ones() == 0 {
100 break;
101 }
102
103 bitmap.intersect(bm);
104 }
105
106 output.matched_segment_ids = bitmap;
107 Ok(output)
108 }
109
110 fn memory_usage(&self) -> usize {
112 let mut size = self.fst_appliers.capacity() * size_of::<(IndexName, Box<dyn FstApplier>)>();
113 for (name, fst_applier) in &self.fst_appliers {
114 size += name.capacity();
115 size += fst_applier.memory_usage();
116 }
117 size
118 }
119}
120
121impl PredicatesIndexApplier {
122 pub fn try_from(mut predicates: Vec<(IndexName, Vec<Predicate>)>) -> Result<Self> {
125 let mut fst_appliers = Vec::with_capacity(predicates.len());
126
127 let in_list_index = predicates
129 .iter_mut()
130 .partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
131 let mut iter = predicates.into_iter();
132 for _ in 0..in_list_index {
133 let (column_name, predicates) = iter.next().unwrap();
134 let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
135 fst_appliers.push((column_name, fst_applier));
136 }
137
138 for (column_name, predicates) in iter {
139 if predicates.is_empty() {
140 continue;
141 }
142 let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
143 fst_appliers.push((column_name, fst_applier));
144 }
145
146 Ok(PredicatesIndexApplier { fst_appliers })
147 }
148
149 fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap {
151 let total_count = metadata.total_row_count;
152 let segment_count = metadata.segment_row_count;
153 let len = total_count.div_ceil(segment_count);
154 Bitmap::full_bitvec(len as _)
155 }
156}
157
158impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
159 type Error = crate::inverted_index::error::Error;
160 fn try_from(predicates: Vec<(String, Vec<Predicate>)>) -> Result<Self> {
161 Self::try_from(predicates)
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use std::collections::VecDeque;
168 use std::sync::Arc;
169
170 use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
171
172 use super::*;
173 use crate::bitmap::Bitmap;
174 use crate::inverted_index::FstMap;
175 use crate::inverted_index::error::Error;
176 use crate::inverted_index::format::reader::MockInvertedIndexReader;
177 use crate::inverted_index::search::fst_apply::MockFstApplier;
178
179 fn s(s: &'static str) -> String {
180 s.to_owned()
181 }
182
183 fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> Arc<InvertedIndexMetas> {
184 let mut metas = InvertedIndexMetas {
185 total_row_count: 8,
186 segment_row_count: 1,
187 ..Default::default()
188 };
189 for (tag, idx) in tags.into_iter() {
190 let meta = InvertedIndexMeta {
191 name: s(tag),
192 relative_fst_offset: idx,
193 bitmap_type: BitmapType::Roaring.into(),
194 ..Default::default()
195 };
196 metas.metas.insert(s(tag), meta);
197 }
198 Arc::new(metas)
199 }
200
201 fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
202 let mut mock_fst_applier = MockFstApplier::new();
203 mock_fst_applier
204 .expect_apply()
205 .returning(move |fst| fst.get(value).into_iter().collect());
206 Box::new(mock_fst_applier)
207 }
208
209 fn fst_value(offset: u32, size: u32) -> u64 {
210 bytemuck::cast::<_, u64>([offset, size])
211 }
212
213 #[tokio::test]
214 async fn test_index_applier_apply_get_key() {
215 let applier = PredicatesIndexApplier {
217 fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))],
218 };
219
220 let mut mock_reader = MockInvertedIndexReader::new();
222 mock_reader
223 .expect_metadata()
224 .returning(|| Ok(mock_metas([("tag-0", 0)])));
225 mock_reader.expect_fst_vec().returning(|_ranges| {
226 Ok(vec![
227 FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
228 ])
229 });
230
231 mock_reader.expect_bitmap_deque().returning(|arg| {
232 assert_eq!(arg.len(), 1);
233 let range = &arg[0].0;
234 let bitmap_type = arg[0].1;
235 assert_eq!(*range, 2..3);
236 assert_eq!(bitmap_type, BitmapType::Roaring);
237 Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
238 &[0b10101010],
239 bitmap_type,
240 )]))
241 });
242 let output = applier
243 .apply(SearchContext::default(), &mut mock_reader)
244 .await
245 .unwrap();
246 assert_eq!(
247 output.matched_segment_ids,
248 Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
249 );
250
251 let mut mock_reader = MockInvertedIndexReader::new();
253 mock_reader
254 .expect_metadata()
255 .returning(|| Ok(mock_metas([("tag-0", 0)])));
256 mock_reader.expect_fst_vec().returning(|_range| {
257 Ok(vec![
258 FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
259 ])
260 });
261 let output = applier
262 .apply(SearchContext::default(), &mut mock_reader)
263 .await
264 .unwrap();
265 assert_eq!(output.matched_segment_ids.count_ones(), 0);
266 }
267
268 #[tokio::test]
269 async fn test_index_applier_apply_intersection_with_two_tags() {
270 let applier = PredicatesIndexApplier {
272 fst_appliers: vec![
273 (s("tag-0"), key_fst_applier("tag-0_value-0")),
274 (s("tag-1"), key_fst_applier("tag-1_value-a")),
275 ],
276 };
277
278 let mut mock_reader = MockInvertedIndexReader::new();
280 mock_reader
281 .expect_metadata()
282 .returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
283 mock_reader.expect_fst_vec().returning(|ranges| {
284 let mut output = vec![];
285 for range in ranges {
286 match range.start {
287 0 => output
288 .push(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()),
289 1 => output
290 .push(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()),
291 _ => unreachable!(),
292 }
293 }
294 Ok(output)
295 });
296 mock_reader.expect_bitmap_deque().returning(|ranges| {
297 let mut output = VecDeque::new();
298 for (range, bitmap_type) in ranges {
299 let offset = range.start;
300 let size = range.end - range.start;
301 match (offset, size, bitmap_type) {
302 (1, 1, BitmapType::Roaring) => {
303 output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
304 }
305 (2, 1, BitmapType::Roaring) => {
306 output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
307 }
308 _ => unreachable!(),
309 }
310 }
311
312 Ok(output)
313 });
314
315 let output = applier
316 .apply(SearchContext::default(), &mut mock_reader)
317 .await
318 .unwrap();
319 assert_eq!(
320 output.matched_segment_ids,
321 Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring)
322 );
323 }
324
325 #[tokio::test]
326 async fn test_index_applier_without_predicates() {
327 let applier = PredicatesIndexApplier {
328 fst_appliers: vec![],
329 };
330
331 let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
332 mock_reader
333 .expect_metadata()
334 .returning(|| Ok(mock_metas([("tag-0", 0)])));
335
336 let output = applier
337 .apply(SearchContext::default(), &mut mock_reader)
338 .await
339 .unwrap();
340 assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); }
342
343 #[tokio::test]
344 async fn test_index_applier_with_empty_index() {
345 let mut mock_reader = MockInvertedIndexReader::new();
346 mock_reader.expect_metadata().returning(move || {
347 Ok(Arc::new(InvertedIndexMetas {
348 total_row_count: 0, segment_row_count: 1,
350 ..Default::default()
351 }))
352 });
353
354 let mut mock_fst_applier = MockFstApplier::new();
355 mock_fst_applier.expect_apply().never();
356
357 let applier = PredicatesIndexApplier {
358 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
359 };
360
361 let output = applier
362 .apply(SearchContext::default(), &mut mock_reader)
363 .await
364 .unwrap();
365 assert!(output.matched_segment_ids.is_empty());
366 }
367
368 #[tokio::test]
369 async fn test_index_applier_with_nonexistent_index() {
370 let mut mock_reader = MockInvertedIndexReader::new();
371 mock_reader
372 .expect_metadata()
373 .returning(|| Ok(mock_metas(vec![])));
374
375 let mut mock_fst_applier = MockFstApplier::new();
376 mock_fst_applier.expect_apply().never();
377
378 let applier = PredicatesIndexApplier {
379 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
380 };
381
382 let result = applier
383 .apply(
384 SearchContext {
385 index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
386 },
387 &mut mock_reader,
388 )
389 .await;
390 assert!(matches!(result, Err(Error::IndexNotFound { .. })));
391
392 let output = applier
393 .apply(
394 SearchContext {
395 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
396 },
397 &mut mock_reader,
398 )
399 .await
400 .unwrap();
401 assert!(output.matched_segment_ids.is_empty());
402
403 let output = applier
404 .apply(
405 SearchContext {
406 index_not_found_strategy: IndexNotFoundStrategy::Ignore,
407 },
408 &mut mock_reader,
409 )
410 .await
411 .unwrap();
412 assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8));
413 }
414
415 #[test]
416 fn test_index_applier_memory_usage() {
417 let mut mock_fst_applier = MockFstApplier::new();
418 mock_fst_applier.expect_memory_usage().returning(|| 100);
419
420 let applier = PredicatesIndexApplier {
421 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
422 };
423
424 assert_eq!(
425 applier.memory_usage(),
426 size_of::<(IndexName, Box<dyn FstApplier>)>() + 5 + 100
427 );
428 }
429}