index/inverted_index/search/index_apply/
predicates_apply.rs1use std::mem::size_of;
16
17use async_trait::async_trait;
18use greptime_proto::v1::index::InvertedIndexMetas;
19
20use crate::bitmap::Bitmap;
21use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
22use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
23use crate::inverted_index::search::fst_apply::{
24 FstApplier, IntersectionFstApplier, KeysFstApplier,
25};
26use crate::inverted_index::search::fst_values_mapper::ParallelFstValuesMapper;
27use crate::inverted_index::search::index_apply::{
28 ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
29};
30use crate::inverted_index::search::predicate::Predicate;
31
32type IndexName = String;
33
34pub struct PredicatesIndexApplier {
37 fst_appliers: Vec<(IndexName, Box<dyn FstApplier>)>,
40}
41
42#[async_trait]
43impl IndexApplier for PredicatesIndexApplier {
44 async fn apply<'a, 'b>(
47 &self,
48 context: SearchContext,
49 reader: &mut (dyn InvertedIndexReader + 'a),
50 metrics: Option<&'b mut InvertedIndexReadMetrics>,
51 ) -> Result<ApplyOutput> {
52 let mut metrics = metrics;
53 let metadata = reader.metadata(metrics.as_deref_mut()).await?;
54 let mut output = ApplyOutput {
55 matched_segment_ids: Bitmap::new_bitvec(),
56 total_row_count: metadata.total_row_count as _,
57 segment_row_count: metadata.segment_row_count as _,
58 };
59
60 let mut appliers = Vec::with_capacity(self.fst_appliers.len());
62 let mut fst_ranges = Vec::with_capacity(self.fst_appliers.len());
63
64 for (name, fst_applier) in &self.fst_appliers {
65 let Some(meta) = metadata.metas.get(name) else {
66 match context.index_not_found_strategy {
67 IndexNotFoundStrategy::ReturnEmpty => {
68 return Ok(output);
69 }
70 IndexNotFoundStrategy::Ignore => {
71 continue;
72 }
73 IndexNotFoundStrategy::ThrowError => {
74 return IndexNotFoundSnafu { name }.fail();
75 }
76 }
77 };
78 let fst_offset = meta.base_offset + meta.relative_fst_offset as u64;
79 let fst_size = meta.fst_size as u64;
80 appliers.push((fst_applier, meta));
81 fst_ranges.push(fst_offset..fst_offset + fst_size);
82 }
83
84 if fst_ranges.is_empty() {
85 output.matched_segment_ids = Self::bitmap_full_range(&metadata);
86 return Ok(output);
87 }
88
89 let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?;
90 let value_and_meta_vec = fsts
91 .into_iter()
92 .zip(appliers)
93 .map(|(fst, (fst_applier, meta))| (fst_applier.apply(&fst), meta))
94 .collect::<Vec<_>>();
95
96 let mut mapper = ParallelFstValuesMapper::new(reader);
97 let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec, metrics).await?;
98
99 let mut bitmap = bm_vec.pop().unwrap(); for bm in bm_vec {
101 if bm.count_ones() == 0 {
102 break;
103 }
104
105 bitmap.intersect(bm);
106 }
107
108 output.matched_segment_ids = bitmap;
109 Ok(output)
110 }
111
112 fn memory_usage(&self) -> usize {
114 let mut size = self.fst_appliers.capacity() * size_of::<(IndexName, Box<dyn FstApplier>)>();
115 for (name, fst_applier) in &self.fst_appliers {
116 size += name.capacity();
117 size += fst_applier.memory_usage();
118 }
119 size
120 }
121}
122
123impl PredicatesIndexApplier {
124 pub fn try_from(mut predicates: Vec<(IndexName, Vec<Predicate>)>) -> Result<Self> {
127 let mut fst_appliers = Vec::with_capacity(predicates.len());
128
129 let in_list_index = predicates
131 .iter_mut()
132 .partition_in_place(|(_, ps)| ps.iter().any(|p| matches!(p, Predicate::InList(_))));
133 let mut iter = predicates.into_iter();
134 for _ in 0..in_list_index {
135 let (column_name, predicates) = iter.next().unwrap();
136 let fst_applier = Box::new(KeysFstApplier::try_from(predicates)?) as _;
137 fst_appliers.push((column_name, fst_applier));
138 }
139
140 for (column_name, predicates) in iter {
141 if predicates.is_empty() {
142 continue;
143 }
144 let fst_applier = Box::new(IntersectionFstApplier::try_from(predicates)?) as _;
145 fst_appliers.push((column_name, fst_applier));
146 }
147
148 Ok(PredicatesIndexApplier { fst_appliers })
149 }
150
151 fn bitmap_full_range(metadata: &InvertedIndexMetas) -> Bitmap {
153 let total_count = metadata.total_row_count;
154 let segment_count = metadata.segment_row_count;
155 let len = total_count.div_ceil(segment_count);
156 Bitmap::full_bitvec(len as _)
157 }
158}
159
160impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
161 type Error = crate::inverted_index::error::Error;
162 fn try_from(predicates: Vec<(String, Vec<Predicate>)>) -> Result<Self> {
163 Self::try_from(predicates)
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::collections::VecDeque;
170 use std::sync::Arc;
171
172 use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
173
174 use super::*;
175 use crate::bitmap::Bitmap;
176 use crate::inverted_index::FstMap;
177 use crate::inverted_index::error::Error;
178 use crate::inverted_index::format::reader::MockInvertedIndexReader;
179 use crate::inverted_index::search::fst_apply::MockFstApplier;
180
181 fn s(s: &'static str) -> String {
182 s.to_owned()
183 }
184
185 fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> Arc<InvertedIndexMetas> {
186 let mut metas = InvertedIndexMetas {
187 total_row_count: 8,
188 segment_row_count: 1,
189 ..Default::default()
190 };
191 for (tag, idx) in tags.into_iter() {
192 let meta = InvertedIndexMeta {
193 name: s(tag),
194 relative_fst_offset: idx,
195 bitmap_type: BitmapType::Roaring.into(),
196 ..Default::default()
197 };
198 metas.metas.insert(s(tag), meta);
199 }
200 Arc::new(metas)
201 }
202
203 fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
204 let mut mock_fst_applier = MockFstApplier::new();
205 mock_fst_applier
206 .expect_apply()
207 .returning(move |fst| fst.get(value).into_iter().collect());
208 Box::new(mock_fst_applier)
209 }
210
211 fn fst_value(offset: u32, size: u32) -> u64 {
212 bytemuck::cast::<_, u64>([offset, size])
213 }
214
215 #[tokio::test]
216 async fn test_index_applier_apply_get_key() {
217 let applier = PredicatesIndexApplier {
219 fst_appliers: vec![(s("tag-0"), key_fst_applier("tag-0_value-0"))],
220 };
221
222 let mut mock_reader = MockInvertedIndexReader::new();
224 mock_reader
225 .expect_metadata()
226 .returning(|_| Ok(mock_metas([("tag-0", 0)])));
227 mock_reader.expect_fst_vec().returning(|_ranges, _metrics| {
228 Ok(vec![
229 FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
230 ])
231 });
232
233 mock_reader
234 .expect_bitmap_deque()
235 .returning(|arg, _metrics| {
236 assert_eq!(arg.len(), 1);
237 let range = &arg[0].0;
238 let bitmap_type = arg[0].1;
239 assert_eq!(*range, 2..3);
240 assert_eq!(bitmap_type, BitmapType::Roaring);
241 Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
242 &[0b10101010],
243 bitmap_type,
244 )]))
245 });
246 let output = applier
247 .apply(SearchContext::default(), &mut mock_reader, None)
248 .await
249 .unwrap();
250 assert_eq!(
251 output.matched_segment_ids,
252 Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
253 );
254
255 let mut mock_reader = MockInvertedIndexReader::new();
257 mock_reader
258 .expect_metadata()
259 .returning(|_| Ok(mock_metas([("tag-0", 0)])));
260 mock_reader.expect_fst_vec().returning(|_range, _metrics| {
261 Ok(vec![
262 FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
263 ])
264 });
265 let output = applier
266 .apply(SearchContext::default(), &mut mock_reader, None)
267 .await
268 .unwrap();
269 assert_eq!(output.matched_segment_ids.count_ones(), 0);
270 }
271
272 #[tokio::test]
273 async fn test_index_applier_apply_intersection_with_two_tags() {
274 let applier = PredicatesIndexApplier {
276 fst_appliers: vec![
277 (s("tag-0"), key_fst_applier("tag-0_value-0")),
278 (s("tag-1"), key_fst_applier("tag-1_value-a")),
279 ],
280 };
281
282 let mut mock_reader = MockInvertedIndexReader::new();
284 mock_reader
285 .expect_metadata()
286 .returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
287 mock_reader.expect_fst_vec().returning(|ranges, _metrics| {
288 let mut output = vec![];
289 for range in ranges {
290 match range.start {
291 0 => output
292 .push(FstMap::from_iter([(b"tag-0_value-0", fst_value(1, 1))]).unwrap()),
293 1 => output
294 .push(FstMap::from_iter([(b"tag-1_value-a", fst_value(2, 1))]).unwrap()),
295 _ => unreachable!(),
296 }
297 }
298 Ok(output)
299 });
300 mock_reader
301 .expect_bitmap_deque()
302 .returning(|ranges, _metrics| {
303 let mut output = VecDeque::new();
304 for (range, bitmap_type) in ranges {
305 let offset = range.start;
306 let size = range.end - range.start;
307 match (offset, size, bitmap_type) {
308 (1, 1, BitmapType::Roaring) => {
309 output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
310 }
311 (2, 1, BitmapType::Roaring) => {
312 output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
313 }
314 _ => unreachable!(),
315 }
316 }
317
318 Ok(output)
319 });
320
321 let output = applier
322 .apply(SearchContext::default(), &mut mock_reader, None)
323 .await
324 .unwrap();
325 assert_eq!(
326 output.matched_segment_ids,
327 Bitmap::from_lsb0_bytes(&[0b10001010], BitmapType::Roaring)
328 );
329 }
330
331 #[tokio::test]
332 async fn test_index_applier_without_predicates() {
333 let applier = PredicatesIndexApplier {
334 fst_appliers: vec![],
335 };
336
337 let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
338 mock_reader
339 .expect_metadata()
340 .returning(|_| Ok(mock_metas([("tag-0", 0)])));
341
342 let output = applier
343 .apply(SearchContext::default(), &mut mock_reader, None)
344 .await
345 .unwrap();
346 assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); }
348
349 #[tokio::test]
350 async fn test_index_applier_with_empty_index() {
351 let mut mock_reader = MockInvertedIndexReader::new();
352 mock_reader.expect_metadata().returning(move |_| {
353 Ok(Arc::new(InvertedIndexMetas {
354 total_row_count: 0, segment_row_count: 1,
356 ..Default::default()
357 }))
358 });
359
360 let mut mock_fst_applier = MockFstApplier::new();
361 mock_fst_applier.expect_apply().never();
362
363 let applier = PredicatesIndexApplier {
364 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
365 };
366
367 let output = applier
368 .apply(SearchContext::default(), &mut mock_reader, None)
369 .await
370 .unwrap();
371 assert!(output.matched_segment_ids.is_empty());
372 }
373
374 #[tokio::test]
375 async fn test_index_applier_with_nonexistent_index() {
376 let mut mock_reader = MockInvertedIndexReader::new();
377 mock_reader
378 .expect_metadata()
379 .returning(|_| Ok(mock_metas(vec![])));
380
381 let mut mock_fst_applier = MockFstApplier::new();
382 mock_fst_applier.expect_apply().never();
383
384 let applier = PredicatesIndexApplier {
385 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
386 };
387
388 let result = applier
389 .apply(
390 SearchContext {
391 index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
392 },
393 &mut mock_reader,
394 None,
395 )
396 .await;
397 assert!(matches!(result, Err(Error::IndexNotFound { .. })));
398
399 let output = applier
400 .apply(
401 SearchContext {
402 index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
403 },
404 &mut mock_reader,
405 None,
406 )
407 .await
408 .unwrap();
409 assert!(output.matched_segment_ids.is_empty());
410
411 let output = applier
412 .apply(
413 SearchContext {
414 index_not_found_strategy: IndexNotFoundStrategy::Ignore,
415 },
416 &mut mock_reader,
417 None,
418 )
419 .await
420 .unwrap();
421 assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8));
422 }
423
424 #[test]
425 fn test_index_applier_memory_usage() {
426 let mut mock_fst_applier = MockFstApplier::new();
427 mock_fst_applier.expect_memory_usage().returning(|| 100);
428
429 let applier = PredicatesIndexApplier {
430 fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
431 };
432
433 assert_eq!(
434 applier.memory_usage(),
435 size_of::<(IndexName, Box<dyn FstApplier>)>() + 5 + 100
436 );
437 }
438}