index/inverted_index/search/index_apply/
predicates_apply.rs1use std::mem::size_of;
16
17use async_trait::async_trait;
18use greptime_proto::v1::index::InvertedIndexMetas;
19
20use crate::bitmap::Bitmap;
21use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
22use crate::inverted_index::format::reader::InvertedIndexReader;
23use crate::inverted_index::search::fst_apply::{
24 FstApplier, IntersectionFstApplier, KeysFstApplier,
25};
26use crate::inverted_index::search::fst_values_mapper::ParallelFstValuesMapper;
27use crate::inverted_index::search::index_apply::{
28 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
29};
30use crate::inverted_index::search::predicate::Predicate;
31
32type IndexName = String;
33
34pub struct PredicatesIndexApplier {
37 fst_appliers: Vec<(IndexName, Box<dyn FstApplier>)>,
40}
41
42#[async_trait]
43impl IndexApplier for PredicatesIndexApplier {
44 async fn apply<'a>(
47 &self,
48 context: SearchContext,
49 reader: &mut (dyn InvertedIndexReader + 'a),
50 ) -> Result<ApplyOutput> {
51 let metadata = reader.metadata().await?;
52 let mut output = ApplyOutput {
53 matched_segment_ids: Bitmap::new_bitvec(),
54 total_row_count: metadata.total_row_count as _,
55 segment_row_count: metadata.segment_row_count as _,
56 };
57
58 let mut appliers = Vec::with_capacity(self.fst_appliers.len());
60 let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len());
61
62 for (name, fst_applier) in &self.fst_appliers {
63 let Some(meta) = metadata.metas.get(name) else {
64 match context.index_not_found_strategy {
65 IndexNotFoundStrategy::ReturnEmpty => {
66 return Ok(output);
67 }
68 IndexNotFoundStrategy::Ignore => {
69 continue;
70 }
71 IndexNotFoundStrategy::ThrowError => {
72 return IndexNotFoundSnafu { name }.fail();
73 }
74 }
75 };
76 let fst_offset = meta.base_offset + meta.relative_fst_offset as u64;
77 let fst_size = meta.fst_size as u64;
78 appliers.push((fst_applier, meta));
79 fst_ranges.push(fst_offset..fst_offset + fst_size);
80 }
81
82 if fst_ranges.is_empty() {
83 output.matched_segment_ids = Self::bitmap_full_range(&metadata);
84 return Ok(output);
85 }
86
87 let fsts = reader.fst_vec(&fst_ranges).await?;
88 let value_and_meta_vec = fsts
89 .into_iter()
90 .zip(appliers)
91 .map(|(fst, (fst_applier, meta))| (fst_applier.apply(&fst), meta))
92 .collect::<Vec<_>>();
93
94 let mut mapper = ParallelFstValuesMapper::new(reader);
95 let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
96
97 let mut bitmap = bm_vec.pop().unwrap(); for bm in bm_vec {
99 if bm.count_ones() == 0 {
100 break;
101 }
102
103 bitmap.intersect(bm);
104 }
105
106 output.matched_segment_ids = bitmap;
107 Ok(output)
108 }
109
110 fn memory_usage(&self) -> usize {
112 let mut size = self.fst_appliers.capacity() * size_of::<(IndexName, Box<dyn FstApplier>)>();
113 for (name, fst_applier) in &self.fst_appliers {
114 size += name.capacity();
115 size += fst_applier.memory_usage();
116 }
117 size
118 }
119}
120
121impl PredicatesIndexApplier {
122 pub fn try_from(mut predicates: Vec<(IndexName, Vec<Predicate>)>) -> Result<Self> {
125 let mut fst_appliers = Vec::with_capacity(predicates.len());
126
127 let in_list_index = predicates
129 .iter_mut()
130 .partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
131 let mut iter = predicates.into_iter();
132 for _ in 0..in_list_index {
133 let (column_name, predicates) = iter.next().unwrap();
134 let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
135 fst_appliers.push((column_name, fst_applier));
136 }
137
138 for (column_name, predicates) in iter {
139 if predicates.is_empty() {
140 continue;
141 }
142 let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
143 fst_appliers.push((column_name, fst_applier));
144 }
145
146 Ok(PredicatesIndexApplier { fst_appliers })
147 }
148
149 fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap {
151 let total_count = metadata.total_row_count;
152 let segment_count = metadata.segment_row_count;
153 let len = total_count.div_ceil(segment_count);
154 Bitmap::full_bitvec(len as _)
155 }
156}
157
158impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
159 type Error = crate::inverted_index::error::Error;
160 fn try_from(predicates: Vec<(String, Vec<Predicate>)>) -> Result<Self> {
161 Self::try_from(predicates)
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use std::collections::VecDeque;
168 use std::sync::Arc;
169
170 use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
171
172 use super::*;
173 use crate::bitmap::Bitmap;
174 use crate::inverted_index::error::Error;
175 use crate::inverted_index::format::reader::MockInvertedIndexReader;
176 use crate::inverted_index::search::fst_apply::MockFstApplier;
177 use crate::inverted_index::FstMap;
178
179 fn s(s: &'static str) -> String {
180 s.to_owned()
181 }
182
183 fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> Arc<InvertedIndexMetas> {
184 let mut metas = InvertedIndexMetas {
185 total_row_count: 8,
186 segment_row_count: 1,
187 ..Default::default()
188 };
189 for (tag, idx) in tags.into_iter() {
190 let meta = InvertedIndexMeta {
191 name: s(tag),
192 relative_fst_offset: idx,
193 bitmap_type: BitmapType::Roaring.into(),
194 ..Default::default()
195 };
196 metas.metas.insert(s(tag), meta);
197 }
198 Arc::new(metas)
199 }
200
201 fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
202 let mut mock_fst_applier = MockFstApplier::new();
203 mock_fst_applier
204 .expect_apply()
205 .returning(move |fst| fst.get(value).into_iter().collect());
206 Box::new(mock_fst_applier)
207 }
208
209 fn fst_value(offset: u32, size: u32) -> u64 {
210 bytemuck::cast::<_, u64>([offset, size])
211 }
212
213 #[tokio::test]
214 async fn test_index_applier_apply_get_key() {
215 let applier = PredicatesIndexApplier {
217 fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))],
218 };
219
220 let mut mock_reader = MockInvertedIndexReader::new();
222 mock_reader
223 .expect_metadata()
224 .returning(|| Ok(mock_metas([("tag-0", 0)])));
225 mock_reader.expect_fst_vec().returning(|_ranges| {
226 Ok(vec![FstMap::from_iter([(
227 b"tag-0_value-0",
228 fst_value(2, 1),
229 )])
230 .unwrap()])
231 });
232
233 mock_reader.expect_bitmap_deque().returning(|arg| {
234 assert_eq!(arg.len(), 1);
235 let range = &arg[0].0;
236 let bitmap_type = arg[0].1;
237 assert_eq!(*range, 2..3);
238 assert_eq!(bitmap_type, BitmapType::Roaring);
239 Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
240 &[0b10101010],
241 bitmap_type,
242 )]))
243 });
244 let output = applier
245 .apply(SearchContext::default(), &mut mock_reader)
246 .await
247 .unwrap();
248 assert_eq!(
249 output.matched_segment_ids,
250 Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
251 );
252
253 let mut mock_reader = MockInvertedIndexReader::new();
255 mock_reader
256 .expect_metadata()
257 .returning(|| Ok(mock_metas([("tag-0", 0)])));
258 mock_reader.expect_fst_vec().returning(|_range| {
259 Ok(vec![FstMap::from_iter([(
260 b"tag-0_value-1",
261 fst_value(2, 1),
262 )])
263 .unwrap()])
264 });
265 let output = applier
266 .apply(SearchContext::default(), &mut mock_reader)
267 .await
268 .unwrap();
269 assert_eq!(output.matched_segment_ids.count_ones(), 0);
270 }
271
272 #[tokio::test]
273 async fn test_index_applier_apply_intersection_with_two_tags() {
274 let applier = PredicatesIndexApplier {
276 fst_appliers: vec![
277 (s("tag-0"), key_fst_applier("tag-0_value-0")),
278 (s("tag-1"), key_fst_applier("tag-1_value-a")),
279 ],
280 };
281
282 let mut mock_reader = MockInvertedIndexReader::new();
284 mock_reader
285 .expect_metadata()
286 .returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
287 mock_reader.expect_fst_vec().returning(|ranges| {
288 let mut output = vec![];
289 for range in ranges {
290 match range.start {
291 0 => output
292 .push(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()),
293 1 => output
294 .push(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()),
295 _ => unreachable!(),
296 }
297 }
298 Ok(output)
299 });
300 mock_reader.expect_bitmap_deque().returning(|ranges| {
301 let mut output = VecDeque::new();
302 for (range, bitmap_type) in ranges {
303 let offset = range.start;
304 let size = range.end - range.start;
305 match (offset, size, bitmap_type) {
306 (1, 1, BitmapType::Roaring) => {
307 output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
308 }
309 (2, 1, BitmapType::Roaring) => {
310 output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
311 }
312 _ => unreachable!(),
313 }
314 }
315
316 Ok(output)
317 });
318
319 let output = applier
320 .apply(SearchContext::default(), &mut mock_reader)
321 .await
322 .unwrap();
323 assert_eq!(
324 output.matched_segment_ids,
325 Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring)
326 );
327 }
328
329 #[tokio::test]
330 async fn test_index_applier_without_predicates() {
331 let applier = PredicatesIndexApplier {
332 fst_appliers: vec![],
333 };
334
335 let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
336 mock_reader
337 .expect_metadata()
338 .returning(|| Ok(mock_metas([("tag-0", 0)])));
339
340 let output = applier
341 .apply(SearchContext::default(), &mut mock_reader)
342 .await
343 .unwrap();
344 assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); }
346
347 #[tokio::test]
348 async fn test_index_applier_with_empty_index() {
349 let mut mock_reader = MockInvertedIndexReader::new();
350 mock_reader.expect_metadata().returning(move || {
351 Ok(Arc::new(InvertedIndexMetas {
352 total_row_count: 0, segment_row_count: 1,
354 ..Default::default()
355 }))
356 });
357
358 let mut mock_fst_applier = MockFstApplier::new();
359 mock_fst_applier.expect_apply().never();
360
361 let applier = PredicatesIndexApplier {
362 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
363 };
364
365 let output = applier
366 .apply(SearchContext::default(), &mut mock_reader)
367 .await
368 .unwrap();
369 assert!(output.matched_segment_ids.is_empty());
370 }
371
372 #[tokio::test]
373 async fn test_index_applier_with_nonexistent_index() {
374 let mut mock_reader = MockInvertedIndexReader::new();
375 mock_reader
376 .expect_metadata()
377 .returning(|| Ok(mock_metas(vec![])));
378
379 let mut mock_fst_applier = MockFstApplier::new();
380 mock_fst_applier.expect_apply().never();
381
382 let applier = PredicatesIndexApplier {
383 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
384 };
385
386 let result = applier
387 .apply(
388 SearchContext {
389 index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
390 },
391 &mut mock_reader,
392 )
393 .await;
394 assert!(matches!(result, Err(Error::IndexNotFound { .. })));
395
396 let output = applier
397 .apply(
398 SearchContext {
399 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
400 },
401 &mut mock_reader,
402 )
403 .await
404 .unwrap();
405 assert!(output.matched_segment_ids.is_empty());
406
407 let output = applier
408 .apply(
409 SearchContext {
410 index_not_found_strategy: IndexNotFoundStrategy::Ignore,
411 },
412 &mut mock_reader,
413 )
414 .await
415 .unwrap();
416 assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8));
417 }
418
419 #[test]
420 fn test_index_applier_memory_usage() {
421 let mut mock_fst_applier = MockFstApplier::new();
422 mock_fst_applier.expect_memory_usage().returning(|| 100);
423
424 let applier = PredicatesIndexApplier {
425 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
426 };
427
428 assert_eq!(
429 applier.memory_usage(),
430 size_of::<(IndexName, Box<dyn FstApplier>)>() + 5 + 100
431 );
432 }
433}