1use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::Arc;
18use std::sync::atomic::AtomicUsize;
19
20use api::v1::SemanticType;
21use common_telemetry::{debug, warn};
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::Helper;
24use index::inverted_index::create::InvertedIndexCreator;
25use index::inverted_index::create::sort::external_sort::ExternalSorter;
26use index::inverted_index::create::sort_create::SortIndexCreator;
27use index::inverted_index::format::writer::InvertedIndexBlobWriter;
28use index::target::IndexTarget;
29use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
30use mito_codec::row_converter::{CompositeValues, SortField};
31use puffin::puffin_manager::{PuffinWriter, PutOptions};
32use snafu::{ResultExt, ensure};
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, FileId};
36use tokio::io::duplex;
37use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
38
39use crate::error::{
40 BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
41 PushIndexValueSnafu, Result,
42};
43use crate::read::Batch;
44use crate::sst::index::intermediate::{
45 IntermediateLocation, IntermediateManager, TempFileProvider,
46};
47use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
48use crate::sst::index::puffin_manager::SstPuffinWriter;
49use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
50use crate::sst::index::{TYPE_INVERTED_INDEX, decode_primary_keys_with_counts};
51
52const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
57
58pub struct InvertedIndexer {
60 index_creator: Box<dyn InvertedIndexCreator>,
62 temp_file_provider: Arc<TempFileProvider>,
64
65 codec: IndexValuesCodec,
67 value_buf: Vec<u8>,
69
70 stats: Statistics,
72 aborted: bool,
74
75 memory_usage: Arc<AtomicUsize>,
77
78 indexed_column_ids: Vec<(ColumnId, String)>,
80
81 metadata: RegionMetadataRef,
83}
84
85impl InvertedIndexer {
86 pub fn new(
89 sst_file_id: FileId,
90 metadata: &RegionMetadataRef,
91 intermediate_manager: IntermediateManager,
92 memory_usage_threshold: Option<usize>,
93 segment_row_count: NonZeroUsize,
94 indexed_column_ids: HashSet<ColumnId>,
95 ) -> Self {
96 let temp_file_provider = Arc::new(TempFileProvider::new(
97 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
98 intermediate_manager,
99 ));
100
101 let memory_usage = Arc::new(AtomicUsize::new(0));
102
103 let sorter = ExternalSorter::factory(
104 temp_file_provider.clone() as _,
105 Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
106 memory_usage.clone(),
107 memory_usage_threshold,
108 );
109 let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
110
111 let codec = IndexValuesCodec::from_tag_columns(
112 metadata.primary_key_encoding,
113 metadata.primary_key_columns(),
114 );
115 let indexed_column_ids = indexed_column_ids
116 .into_iter()
117 .map(|col_id| {
118 let target_key = format!("{}", IndexTarget::ColumnId(col_id));
119 (col_id, target_key)
120 })
121 .collect();
122 Self {
123 codec,
124 index_creator,
125 temp_file_provider,
126 value_buf: vec![],
127 stats: Statistics::new(TYPE_INVERTED_INDEX),
128 aborted: false,
129 memory_usage,
130 indexed_column_ids,
131 metadata: metadata.clone(),
132 }
133 }
134
135 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
138 ensure!(!self.aborted, OperateAbortedIndexSnafu);
139
140 if batch.is_empty() {
141 return Ok(());
142 }
143
144 if let Err(update_err) = self.do_update(batch).await {
145 if let Err(err) = self.do_cleanup().await {
147 if cfg!(any(test, feature = "test")) {
148 panic!("Failed to clean up index creator, err: {err}",);
149 } else {
150 warn!(err; "Failed to clean up index creator");
151 }
152 }
153 return Err(update_err);
154 }
155
156 Ok(())
157 }
158
159 pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
161 ensure!(!self.aborted, OperateAbortedIndexSnafu);
162
163 if batch.num_rows() == 0 {
164 return Ok(());
165 }
166
167 self.do_update_flat(batch).await
168 }
169
170 async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
171 let mut guard = self.stats.record_update();
172
173 guard.inc_row_count(batch.num_rows());
174
175 let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
176 let mut decoded_pks: Option<Vec<(CompositeValues, usize)>> = None;
177
178 for (col_id, target_key) in &self.indexed_column_ids {
179 let Some(column_meta) = self.metadata.column_by_id(*col_id) else {
180 debug!(
181 "Column {} not found in the metadata during building inverted index",
182 col_id
183 );
184 continue;
185 };
186 let column_name = &column_meta.column_schema.name;
187 if let Some(column_array) = batch.column_by_name(column_name) {
188 let vector = Helper::try_into_vector(column_array.clone())
190 .context(crate::error::ConvertVectorSnafu)?;
191 let sort_field = SortField::new(vector.data_type());
192
193 for row in 0..batch.num_rows() {
194 self.value_buf.clear();
195 let value_ref = vector.get_ref(row);
196
197 if value_ref.is_null() {
198 self.index_creator
199 .push_with_name(target_key, None)
200 .await
201 .context(PushIndexValueSnafu)?;
202 } else {
203 IndexValueCodec::encode_nonnull_value(
204 value_ref,
205 &sort_field,
206 &mut self.value_buf,
207 )
208 .context(EncodeSnafu)?;
209 self.index_creator
210 .push_with_name(target_key, Some(&self.value_buf))
211 .await
212 .context(PushIndexValueSnafu)?;
213 }
214 }
215 } else if is_sparse && column_meta.semantic_type == SemanticType::Tag {
216 if decoded_pks.is_none() {
218 decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?);
219 }
220
221 let pk_values_with_counts = decoded_pks.as_ref().unwrap();
222 let Some(col_info) = self.codec.pk_col_info(*col_id) else {
223 debug!(
224 "Column {} not found in primary key during building bloom filter index",
225 column_name
226 );
227 continue;
228 };
229 let pk_index = col_info.idx;
230 let field = &col_info.field;
231 for (decoded, count) in pk_values_with_counts {
232 let value = match decoded {
233 CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1),
234 CompositeValues::Sparse(sparse) => sparse.get(col_id),
235 };
236
237 let elem = value
238 .filter(|v| !v.is_null())
239 .map(|v| {
240 self.value_buf.clear();
241 IndexValueCodec::encode_nonnull_value(
242 v.as_value_ref(),
243 field,
244 &mut self.value_buf,
245 )
246 .context(EncodeSnafu)?;
247 Ok(self.value_buf.as_slice())
248 })
249 .transpose()?;
250
251 self.index_creator
252 .push_with_name_n(target_key, elem, *count)
253 .await
254 .context(PushIndexValueSnafu)?;
255 }
256 } else {
257 debug!(
258 "Column {} not found in the batch during building inverted index",
259 col_id
260 );
261 }
262 }
263
264 Ok(())
265 }
266
267 pub async fn finish(
270 &mut self,
271 puffin_writer: &mut SstPuffinWriter,
272 ) -> Result<(RowCount, ByteCount)> {
273 ensure!(!self.aborted, OperateAbortedIndexSnafu);
274
275 if self.stats.row_count() == 0 {
276 return Ok((0, 0));
278 }
279
280 let finish_res = self.do_finish(puffin_writer).await;
281 if let Err(err) = self.do_cleanup().await {
283 if cfg!(any(test, feature = "test")) {
284 panic!("Failed to clean up index creator, err: {err}",);
285 } else {
286 warn!(err; "Failed to clean up index creator");
287 }
288 }
289
290 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
291 }
292
293 pub async fn abort(&mut self) -> Result<()> {
295 if self.aborted {
296 return Ok(());
297 }
298 self.aborted = true;
299
300 self.do_cleanup().await
301 }
302
303 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
304 let mut guard = self.stats.record_update();
305
306 let n = batch.num_rows();
307 guard.inc_row_count(n);
308
309 for (col_id, target_key) in &self.indexed_column_ids {
310 match self.codec.pk_col_info(*col_id) {
311 Some(col_info) => {
313 let pk_idx = col_info.idx;
314 let field = &col_info.field;
315 let value = batch
316 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
317 .filter(|v| !v.is_null())
318 .map(|v| {
319 self.value_buf.clear();
320 IndexValueCodec::encode_nonnull_value(
321 v.as_value_ref(),
322 field,
323 &mut self.value_buf,
324 )
325 .context(EncodeSnafu)?;
326 Ok(self.value_buf.as_slice())
327 })
328 .transpose()?;
329
330 self.index_creator
331 .push_with_name_n(target_key, value, n)
332 .await
333 .context(PushIndexValueSnafu)?;
334 }
335 None => {
337 let Some(values) = batch.field_col_value(*col_id) else {
338 debug!(
339 "Column {} not found in the batch during building inverted index",
340 col_id
341 );
342 continue;
343 };
344 let sort_field = SortField::new(values.data.data_type());
345 for i in 0..n {
346 self.value_buf.clear();
347 let value = values.data.get_ref(i);
348 if value.is_null() {
349 self.index_creator
350 .push_with_name(target_key, None)
351 .await
352 .context(PushIndexValueSnafu)?;
353 } else {
354 IndexValueCodec::encode_nonnull_value(
355 value,
356 &sort_field,
357 &mut self.value_buf,
358 )
359 .context(EncodeSnafu)?;
360 self.index_creator
361 .push_with_name(target_key, Some(&self.value_buf))
362 .await
363 .context(PushIndexValueSnafu)?;
364 }
365 }
366 }
367 }
368 }
369
370 Ok(())
371 }
372
373 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
392 let mut guard = self.stats.record_finish();
393
394 let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
395 let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
396
397 let (index_finish, puffin_add_blob) = futures::join!(
398 self.index_creator
400 .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
401 puffin_writer.put_blob(
402 INDEX_BLOB_TYPE,
403 rx.compat(),
404 PutOptions::default(),
405 Default::default(),
406 )
407 );
408
409 match (
410 puffin_add_blob.context(PuffinAddBlobSnafu),
411 index_finish.context(IndexFinishSnafu),
412 ) {
413 (Err(e1), Err(e2)) => BiErrorsSnafu {
414 first: Box::new(e1),
415 second: Box::new(e2),
416 }
417 .fail()?,
418
419 (Ok(_), e @ Err(_)) => e?,
420 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
421 (Ok(written_bytes), Ok(_)) => {
422 guard.inc_byte_count(written_bytes);
423 }
424 }
425
426 Ok(())
427 }
428
429 async fn do_cleanup(&mut self) -> Result<()> {
430 let _guard = self.stats.record_cleanup();
431
432 self.temp_file_provider.cleanup().await
433 }
434
435 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
436 self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
437 }
438
439 pub fn memory_usage(&self) -> usize {
440 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use std::collections::BTreeSet;
447
448 use api::v1::SemanticType;
449 use datafusion_expr::{Expr as DfExpr, Operator, binary_expr, col, lit};
450 use datatypes::data_type::ConcreteDataType;
451 use datatypes::schema::ColumnSchema;
452 use datatypes::value::ValueRef;
453 use datatypes::vectors::{UInt8Vector, UInt64Vector};
454 use futures::future::BoxFuture;
455 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
456 use object_store::ObjectStore;
457 use object_store::services::Memory;
458 use puffin::puffin_manager::PuffinManager;
459 use puffin::puffin_manager::cache::PuffinMetadataCache;
460 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
461 use store_api::region_request::PathType;
462 use store_api::storage::RegionId;
463
464 use super::*;
465 use crate::access_layer::RegionFilePathFactory;
466 use crate::cache::index::inverted_index::InvertedIndexCache;
467 use crate::metrics::CACHE_BYTES;
468 use crate::read::BatchColumn;
469 use crate::sst::file::{RegionFileId, RegionIndexId};
470 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
471 use crate::sst::index::puffin_manager::PuffinManagerFactory;
472
473 fn mock_object_store() -> ObjectStore {
474 ObjectStore::new(Memory::default()).unwrap().finish()
475 }
476
477 async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
478 IntermediateManager::init_fs(path).await.unwrap()
479 }
480
481 fn mock_region_metadata() -> RegionMetadataRef {
482 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
483 builder
484 .push_column_metadata(ColumnMetadata {
485 column_schema: ColumnSchema::new(
486 "tag_str",
487 ConcreteDataType::string_datatype(),
488 false,
489 ),
490 semantic_type: SemanticType::Tag,
491 column_id: 1,
492 })
493 .push_column_metadata(ColumnMetadata {
494 column_schema: ColumnSchema::new(
495 "tag_i32",
496 ConcreteDataType::int32_datatype(),
497 false,
498 ),
499 semantic_type: SemanticType::Tag,
500 column_id: 2,
501 })
502 .push_column_metadata(ColumnMetadata {
503 column_schema: ColumnSchema::new(
504 "ts",
505 ConcreteDataType::timestamp_millisecond_datatype(),
506 false,
507 ),
508 semantic_type: SemanticType::Timestamp,
509 column_id: 3,
510 })
511 .push_column_metadata(ColumnMetadata {
512 column_schema: ColumnSchema::new(
513 "field_u64",
514 ConcreteDataType::uint64_datatype(),
515 false,
516 ),
517 semantic_type: SemanticType::Field,
518 column_id: 4,
519 })
520 .primary_key(vec![1, 2]);
521
522 Arc::new(builder.build().unwrap())
523 }
524
525 fn new_batch(
526 str_tag: impl AsRef<str>,
527 i32_tag: impl Into<i32>,
528 u64_field: impl IntoIterator<Item = u64>,
529 ) -> Batch {
530 let fields = vec![
531 (0, SortField::new(ConcreteDataType::string_datatype())),
532 (1, SortField::new(ConcreteDataType::int32_datatype())),
533 ];
534 let codec = DensePrimaryKeyCodec::with_fields(fields);
535 let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
536 let primary_key = codec.encode(row.into_iter()).unwrap();
537
538 let u64_field = BatchColumn {
539 column_id: 4,
540 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
541 };
542 let num_rows = u64_field.data.len();
543
544 Batch::new(
545 primary_key,
546 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
547 0, num_rows,
548 ))),
549 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
550 0, num_rows,
551 ))),
552 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
553 1, num_rows,
554 ))),
555 vec![u64_field],
556 )
557 .unwrap()
558 }
559
560 async fn build_applier_factory(
561 prefix: &str,
562 rows: BTreeSet<(&'static str, i32, [u64; 2])>,
563 ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
564 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
565 let table_dir = "table0".to_string();
566 let sst_file_id = FileId::random();
567 let object_store = mock_object_store();
568 let region_metadata = mock_region_metadata();
569 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
570 let memory_threshold = None;
571 let segment_row_count = 2;
572 let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
573
574 let mut creator = InvertedIndexer::new(
575 sst_file_id,
576 ®ion_metadata,
577 intm_mgr,
578 memory_threshold,
579 NonZeroUsize::new(segment_row_count).unwrap(),
580 indexed_column_ids.clone(),
581 );
582
583 for (str_tag, i32_tag, u64_field) in &rows {
584 let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
585 creator.update(&mut batch).await.unwrap();
586 }
587
588 let puffin_manager = factory.build(
589 object_store.clone(),
590 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
591 );
592
593 let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
594 let index_id = RegionIndexId::new(sst_file_id, 0);
595 let mut writer = puffin_manager.writer(&index_id).await.unwrap();
596 let (row_count, _) = creator.finish(&mut writer).await.unwrap();
597 assert_eq!(row_count, rows.len() * segment_row_count);
598 writer.finish().await.unwrap();
599
600 move |expr| {
601 let _d = &d;
602 let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
603 let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
604 let applier = InvertedIndexApplierBuilder::new(
605 table_dir.clone(),
606 PathType::Bare,
607 object_store.clone(),
608 ®ion_metadata,
609 indexed_column_ids.clone(),
610 factory.clone(),
611 )
612 .with_inverted_index_cache(Some(cache))
613 .with_puffin_metadata_cache(Some(puffin_metadata_cache))
614 .build(&[expr])
615 .unwrap()
616 .unwrap();
617 let sst_metadata = Arc::new(region_metadata.clone());
618 let plan = applier.plan_for_sst(&sst_metadata).unwrap().unwrap();
619 Box::pin(async move {
620 applier
621 .apply(index_id, None, &plan.index_applier, None)
622 .await
623 .unwrap()
624 .matched_segment_ids
625 .iter_ones()
626 .collect()
627 })
628 }
629 }
630
631 #[tokio::test]
632 async fn test_create_and_query_get_key() {
633 let rows = BTreeSet::from_iter([
634 ("aaa", 1, [1, 2]),
635 ("aaa", 2, [2, 3]),
636 ("aaa", 3, [3, 4]),
637 ("aab", 1, [4, 5]),
638 ("aab", 2, [5, 6]),
639 ("aab", 3, [6, 7]),
640 ("abc", 1, [7, 8]),
641 ("abc", 2, [8, 9]),
642 ("abc", 3, [9, 10]),
643 ]);
644
645 let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
646
647 let expr = col("tag_str").eq(lit("aaa"));
648 let res = applier_factory(expr).await;
649 assert_eq!(res, vec![0, 1, 2]);
650
651 let expr = col("tag_i32").eq(lit(2));
652 let res = applier_factory(expr).await;
653 assert_eq!(res, vec![1, 4, 7]);
654
655 let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
656 let res = applier_factory(expr).await;
657 assert_eq!(res, vec![1]);
658
659 let expr = col("tag_str")
660 .eq(lit("aaa"))
661 .or(col("tag_str").eq(lit("abc")));
662 let res = applier_factory(expr).await;
663 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
664
665 let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
666 let res = applier_factory(expr).await;
667 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
668
669 let expr = col("field_u64").eq(lit(2u64));
670 let res = applier_factory(expr).await;
671 assert_eq!(res, vec![0, 1]);
672 }
673
674 #[tokio::test]
675 async fn test_create_and_query_range() {
676 let rows = BTreeSet::from_iter([
677 ("aaa", 1, [1, 2]),
678 ("aaa", 2, [2, 3]),
679 ("aaa", 3, [3, 4]),
680 ("aab", 1, [4, 5]),
681 ("aab", 2, [5, 6]),
682 ("aab", 3, [6, 7]),
683 ("abc", 1, [7, 8]),
684 ("abc", 2, [8, 9]),
685 ("abc", 3, [9, 10]),
686 ]);
687
688 let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
689
690 let expr = col("tag_str").between(lit("aaa"), lit("aab"));
691 let res = applier_factory(expr).await;
692 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
693
694 let expr = col("tag_i32").between(lit(2), lit(3));
695 let res = applier_factory(expr).await;
696 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
697
698 let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
699 let res = applier_factory(expr).await;
700 assert_eq!(res, vec![0, 1, 2]);
701
702 let expr = col("tag_i32").between(lit(2), lit(2));
703 let res = applier_factory(expr).await;
704 assert_eq!(res, vec![1, 4, 7]);
705
706 let expr = col("field_u64").between(lit(2u64), lit(5u64));
707 let res = applier_factory(expr).await;
708 assert_eq!(res, vec![0, 1, 2, 3, 4]);
709 }
710
711 #[tokio::test]
712 async fn test_create_and_query_comparison() {
713 let rows = BTreeSet::from_iter([
714 ("aaa", 1, [1, 2]),
715 ("aaa", 2, [2, 3]),
716 ("aaa", 3, [3, 4]),
717 ("aab", 1, [4, 5]),
718 ("aab", 2, [5, 6]),
719 ("aab", 3, [6, 7]),
720 ("abc", 1, [7, 8]),
721 ("abc", 2, [8, 9]),
722 ("abc", 3, [9, 10]),
723 ]);
724
725 let applier_factory =
726 build_applier_factory("test_create_and_query_comparison_", rows).await;
727
728 let expr = col("tag_str").lt(lit("aab"));
729 let res = applier_factory(expr).await;
730 assert_eq!(res, vec![0, 1, 2]);
731
732 let expr = col("tag_i32").lt(lit(2));
733 let res = applier_factory(expr).await;
734 assert_eq!(res, vec![0, 3, 6]);
735
736 let expr = col("field_u64").lt(lit(2u64));
737 let res = applier_factory(expr).await;
738 assert_eq!(res, vec![0]);
739
740 let expr = col("tag_str").gt(lit("aab"));
741 let res = applier_factory(expr).await;
742 assert_eq!(res, vec![6, 7, 8]);
743
744 let expr = col("tag_i32").gt(lit(2));
745 let res = applier_factory(expr).await;
746 assert_eq!(res, vec![2, 5, 8]);
747
748 let expr = col("field_u64").gt(lit(8u64));
749 let res = applier_factory(expr).await;
750 assert_eq!(res, vec![7, 8]);
751
752 let expr = col("tag_str").lt_eq(lit("aab"));
753 let res = applier_factory(expr).await;
754 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
755
756 let expr = col("tag_i32").lt_eq(lit(2));
757 let res = applier_factory(expr).await;
758 assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
759
760 let expr = col("field_u64").lt_eq(lit(2u64));
761 let res = applier_factory(expr).await;
762 assert_eq!(res, vec![0, 1]);
763
764 let expr = col("tag_str").gt_eq(lit("aab"));
765 let res = applier_factory(expr).await;
766 assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
767
768 let expr = col("tag_i32").gt_eq(lit(2));
769 let res = applier_factory(expr).await;
770 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
771
772 let expr = col("field_u64").gt_eq(lit(8u64));
773 let res = applier_factory(expr).await;
774 assert_eq!(res, vec![6, 7, 8]);
775
776 let expr = col("tag_str")
777 .gt(lit("aaa"))
778 .and(col("tag_str").lt(lit("abc")));
779 let res = applier_factory(expr).await;
780 assert_eq!(res, vec![3, 4, 5]);
781
782 let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
783 let res = applier_factory(expr).await;
784 assert_eq!(res, vec![1, 4, 7]);
785
786 let expr = col("field_u64")
787 .gt(lit(2u64))
788 .and(col("field_u64").lt(lit(9u64)));
789 let res = applier_factory(expr).await;
790 assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
791 }
792
793 #[tokio::test]
794 async fn test_create_and_query_regex() {
795 let rows = BTreeSet::from_iter([
796 ("aaa", 1, [1, 2]),
797 ("aaa", 2, [2, 3]),
798 ("aaa", 3, [3, 4]),
799 ("aab", 1, [4, 5]),
800 ("aab", 2, [5, 6]),
801 ("aab", 3, [6, 7]),
802 ("abc", 1, [7, 8]),
803 ("abc", 2, [8, 9]),
804 ("abc", 3, [9, 10]),
805 ]);
806
807 let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
808
809 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
810 let res = applier_factory(expr).await;
811 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
812
813 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
814 let res = applier_factory(expr).await;
815 assert_eq!(res, vec![6, 7, 8]);
816
817 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
818 let res = applier_factory(expr).await;
819 assert_eq!(res, vec![3, 4, 5]);
820
821 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
822 let res = applier_factory(expr).await;
823 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
824
825 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
826 let res = applier_factory(expr).await;
827 assert!(res.is_empty());
828
829 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
830 let res = applier_factory(expr).await;
831 assert_eq!(res, vec![0, 1, 2]);
832 }
833}