1use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::Arc;
18use std::sync::atomic::AtomicUsize;
19
20use api::v1::SemanticType;
21use common_telemetry::{debug, warn};
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::Helper;
24use index::inverted_index::create::InvertedIndexCreator;
25use index::inverted_index::create::sort::external_sort::ExternalSorter;
26use index::inverted_index::create::sort_create::SortIndexCreator;
27use index::inverted_index::format::writer::InvertedIndexBlobWriter;
28use index::target::IndexTarget;
29use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
30use mito_codec::row_converter::{CompositeValues, SortField};
31use puffin::puffin_manager::{PuffinWriter, PutOptions};
32use snafu::{ResultExt, ensure};
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, FileId};
36use tokio::io::duplex;
37use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
38
39use crate::error::{
40 BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
41 PushIndexValueSnafu, Result,
42};
43use crate::read::Batch;
44use crate::sst::index::intermediate::{
45 IntermediateLocation, IntermediateManager, TempFileProvider,
46};
47use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
48use crate::sst::index::puffin_manager::SstPuffinWriter;
49use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
50use crate::sst::index::{TYPE_INVERTED_INDEX, decode_primary_keys_with_counts};
51
52const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
57
58pub struct InvertedIndexer {
60 index_creator: Box<dyn InvertedIndexCreator>,
62 temp_file_provider: Arc<TempFileProvider>,
64
65 codec: IndexValuesCodec,
67 value_buf: Vec<u8>,
69
70 stats: Statistics,
72 aborted: bool,
74
75 memory_usage: Arc<AtomicUsize>,
77
78 indexed_column_ids: Vec<(ColumnId, String)>,
80
81 metadata: RegionMetadataRef,
83}
84
85impl InvertedIndexer {
86 pub fn new(
89 sst_file_id: FileId,
90 metadata: &RegionMetadataRef,
91 intermediate_manager: IntermediateManager,
92 memory_usage_threshold: Option<usize>,
93 segment_row_count: NonZeroUsize,
94 indexed_column_ids: HashSet<ColumnId>,
95 ) -> Self {
96 let temp_file_provider = Arc::new(TempFileProvider::new(
97 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
98 intermediate_manager,
99 ));
100
101 let memory_usage = Arc::new(AtomicUsize::new(0));
102
103 let sorter = ExternalSorter::factory(
104 temp_file_provider.clone() as _,
105 Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
106 memory_usage.clone(),
107 memory_usage_threshold,
108 );
109 let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
110
111 let codec = IndexValuesCodec::from_tag_columns(
112 metadata.primary_key_encoding,
113 metadata.primary_key_columns(),
114 );
115 let indexed_column_ids = indexed_column_ids
116 .into_iter()
117 .map(|col_id| {
118 let target_key = format!("{}", IndexTarget::ColumnId(col_id));
119 (col_id, target_key)
120 })
121 .collect();
122 Self {
123 codec,
124 index_creator,
125 temp_file_provider,
126 value_buf: vec![],
127 stats: Statistics::new(TYPE_INVERTED_INDEX),
128 aborted: false,
129 memory_usage,
130 indexed_column_ids,
131 metadata: metadata.clone(),
132 }
133 }
134
135 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
138 ensure!(!self.aborted, OperateAbortedIndexSnafu);
139
140 if batch.is_empty() {
141 return Ok(());
142 }
143
144 if let Err(update_err) = self.do_update(batch).await {
145 if let Err(err) = self.do_cleanup().await {
147 if cfg!(any(test, feature = "test")) {
148 panic!("Failed to clean up index creator, err: {err}",);
149 } else {
150 warn!(err; "Failed to clean up index creator");
151 }
152 }
153 return Err(update_err);
154 }
155
156 Ok(())
157 }
158
159 pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
161 ensure!(!self.aborted, OperateAbortedIndexSnafu);
162
163 if batch.num_rows() == 0 {
164 return Ok(());
165 }
166
167 self.do_update_flat(batch).await
168 }
169
170 async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
171 let mut guard = self.stats.record_update();
172
173 guard.inc_row_count(batch.num_rows());
174
175 let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
176 let mut decoded_pks: Option<Vec<(CompositeValues, usize)>> = None;
177
178 for (col_id, target_key) in &self.indexed_column_ids {
179 let Some(column_meta) = self.metadata.column_by_id(*col_id) else {
180 debug!(
181 "Column {} not found in the metadata during building inverted index",
182 col_id
183 );
184 continue;
185 };
186 let column_name = &column_meta.column_schema.name;
187 if let Some(column_array) = batch.column_by_name(column_name) {
188 let vector = Helper::try_into_vector(column_array.clone())
190 .context(crate::error::ConvertVectorSnafu)?;
191 let sort_field = SortField::new(vector.data_type());
192
193 for row in 0..batch.num_rows() {
194 self.value_buf.clear();
195 let value_ref = vector.get_ref(row);
196
197 if value_ref.is_null() {
198 self.index_creator
199 .push_with_name(target_key, None)
200 .await
201 .context(PushIndexValueSnafu)?;
202 } else {
203 IndexValueCodec::encode_nonnull_value(
204 value_ref,
205 &sort_field,
206 &mut self.value_buf,
207 )
208 .context(EncodeSnafu)?;
209 self.index_creator
210 .push_with_name(target_key, Some(&self.value_buf))
211 .await
212 .context(PushIndexValueSnafu)?;
213 }
214 }
215 } else if is_sparse && column_meta.semantic_type == SemanticType::Tag {
216 if decoded_pks.is_none() {
218 decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?);
219 }
220
221 let pk_values_with_counts = decoded_pks.as_ref().unwrap();
222 let Some(col_info) = self.codec.pk_col_info(*col_id) else {
223 debug!(
224 "Column {} not found in primary key during building bloom filter index",
225 column_name
226 );
227 continue;
228 };
229 let pk_index = col_info.idx;
230 let field = &col_info.field;
231 for (decoded, count) in pk_values_with_counts {
232 let value = match decoded {
233 CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1),
234 CompositeValues::Sparse(sparse) => sparse.get(col_id),
235 };
236
237 let elem = value
238 .filter(|v| !v.is_null())
239 .map(|v| {
240 self.value_buf.clear();
241 IndexValueCodec::encode_nonnull_value(
242 v.as_value_ref(),
243 field,
244 &mut self.value_buf,
245 )
246 .context(EncodeSnafu)?;
247 Ok(self.value_buf.as_slice())
248 })
249 .transpose()?;
250
251 self.index_creator
252 .push_with_name_n(target_key, elem, *count)
253 .await
254 .context(PushIndexValueSnafu)?;
255 }
256 } else {
257 debug!(
258 "Column {} not found in the batch during building inverted index",
259 col_id
260 );
261 }
262 }
263
264 Ok(())
265 }
266
267 pub async fn finish(
270 &mut self,
271 puffin_writer: &mut SstPuffinWriter,
272 ) -> Result<(RowCount, ByteCount)> {
273 ensure!(!self.aborted, OperateAbortedIndexSnafu);
274
275 if self.stats.row_count() == 0 {
276 return Ok((0, 0));
278 }
279
280 let finish_res = self.do_finish(puffin_writer).await;
281 if let Err(err) = self.do_cleanup().await {
283 if cfg!(any(test, feature = "test")) {
284 panic!("Failed to clean up index creator, err: {err}",);
285 } else {
286 warn!(err; "Failed to clean up index creator");
287 }
288 }
289
290 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
291 }
292
293 pub async fn abort(&mut self) -> Result<()> {
295 if self.aborted {
296 return Ok(());
297 }
298 self.aborted = true;
299
300 self.do_cleanup().await
301 }
302
303 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
304 let mut guard = self.stats.record_update();
305
306 let n = batch.num_rows();
307 guard.inc_row_count(n);
308
309 for (col_id, target_key) in &self.indexed_column_ids {
310 match self.codec.pk_col_info(*col_id) {
311 Some(col_info) => {
313 let pk_idx = col_info.idx;
314 let field = &col_info.field;
315 let value = batch
316 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
317 .filter(|v| !v.is_null())
318 .map(|v| {
319 self.value_buf.clear();
320 IndexValueCodec::encode_nonnull_value(
321 v.as_value_ref(),
322 field,
323 &mut self.value_buf,
324 )
325 .context(EncodeSnafu)?;
326 Ok(self.value_buf.as_slice())
327 })
328 .transpose()?;
329
330 self.index_creator
331 .push_with_name_n(target_key, value, n)
332 .await
333 .context(PushIndexValueSnafu)?;
334 }
335 None => {
337 let Some(values) = batch.field_col_value(*col_id) else {
338 debug!(
339 "Column {} not found in the batch during building inverted index",
340 col_id
341 );
342 continue;
343 };
344 let sort_field = SortField::new(values.data.data_type());
345 for i in 0..n {
346 self.value_buf.clear();
347 let value = values.data.get_ref(i);
348 if value.is_null() {
349 self.index_creator
350 .push_with_name(target_key, None)
351 .await
352 .context(PushIndexValueSnafu)?;
353 } else {
354 IndexValueCodec::encode_nonnull_value(
355 value,
356 &sort_field,
357 &mut self.value_buf,
358 )
359 .context(EncodeSnafu)?;
360 self.index_creator
361 .push_with_name(target_key, Some(&self.value_buf))
362 .await
363 .context(PushIndexValueSnafu)?;
364 }
365 }
366 }
367 }
368 }
369
370 Ok(())
371 }
372
373 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
392 let mut guard = self.stats.record_finish();
393
394 let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
395 let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
396
397 let (index_finish, puffin_add_blob) = futures::join!(
398 self.index_creator
400 .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
401 puffin_writer.put_blob(
402 INDEX_BLOB_TYPE,
403 rx.compat(),
404 PutOptions::default(),
405 Default::default(),
406 )
407 );
408
409 match (
410 puffin_add_blob.context(PuffinAddBlobSnafu),
411 index_finish.context(IndexFinishSnafu),
412 ) {
413 (Err(e1), Err(e2)) => BiErrorsSnafu {
414 first: Box::new(e1),
415 second: Box::new(e2),
416 }
417 .fail()?,
418
419 (Ok(_), e @ Err(_)) => e?,
420 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
421 (Ok(written_bytes), Ok(_)) => {
422 guard.inc_byte_count(written_bytes);
423 }
424 }
425
426 Ok(())
427 }
428
429 async fn do_cleanup(&mut self) -> Result<()> {
430 let _guard = self.stats.record_cleanup();
431
432 self.temp_file_provider.cleanup().await
433 }
434
435 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
436 self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
437 }
438
439 pub fn memory_usage(&self) -> usize {
440 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use std::collections::BTreeSet;
447
448 use api::v1::SemanticType;
449 use datafusion_expr::{Expr as DfExpr, Operator, binary_expr, col, lit};
450 use datatypes::data_type::ConcreteDataType;
451 use datatypes::schema::ColumnSchema;
452 use datatypes::value::ValueRef;
453 use datatypes::vectors::{UInt8Vector, UInt64Vector};
454 use futures::future::BoxFuture;
455 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
456 use object_store::ObjectStore;
457 use object_store::services::Memory;
458 use puffin::puffin_manager::PuffinManager;
459 use puffin::puffin_manager::cache::PuffinMetadataCache;
460 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
461 use store_api::region_request::PathType;
462 use store_api::storage::RegionId;
463
464 use super::*;
465 use crate::access_layer::RegionFilePathFactory;
466 use crate::cache::index::inverted_index::InvertedIndexCache;
467 use crate::metrics::CACHE_BYTES;
468 use crate::read::BatchColumn;
469 use crate::sst::file::RegionFileId;
470 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
471 use crate::sst::index::puffin_manager::PuffinManagerFactory;
472
473 fn mock_object_store() -> ObjectStore {
474 ObjectStore::new(Memory::default()).unwrap().finish()
475 }
476
477 async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
478 IntermediateManager::init_fs(path).await.unwrap()
479 }
480
481 fn mock_region_metadata() -> RegionMetadataRef {
482 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
483 builder
484 .push_column_metadata(ColumnMetadata {
485 column_schema: ColumnSchema::new(
486 "tag_str",
487 ConcreteDataType::string_datatype(),
488 false,
489 ),
490 semantic_type: SemanticType::Tag,
491 column_id: 1,
492 })
493 .push_column_metadata(ColumnMetadata {
494 column_schema: ColumnSchema::new(
495 "tag_i32",
496 ConcreteDataType::int32_datatype(),
497 false,
498 ),
499 semantic_type: SemanticType::Tag,
500 column_id: 2,
501 })
502 .push_column_metadata(ColumnMetadata {
503 column_schema: ColumnSchema::new(
504 "ts",
505 ConcreteDataType::timestamp_millisecond_datatype(),
506 false,
507 ),
508 semantic_type: SemanticType::Timestamp,
509 column_id: 3,
510 })
511 .push_column_metadata(ColumnMetadata {
512 column_schema: ColumnSchema::new(
513 "field_u64",
514 ConcreteDataType::uint64_datatype(),
515 false,
516 ),
517 semantic_type: SemanticType::Field,
518 column_id: 4,
519 })
520 .primary_key(vec![1, 2]);
521
522 Arc::new(builder.build().unwrap())
523 }
524
525 fn new_batch(
526 str_tag: impl AsRef<str>,
527 i32_tag: impl Into<i32>,
528 u64_field: impl IntoIterator<Item = u64>,
529 ) -> Batch {
530 let fields = vec![
531 (0, SortField::new(ConcreteDataType::string_datatype())),
532 (1, SortField::new(ConcreteDataType::int32_datatype())),
533 ];
534 let codec = DensePrimaryKeyCodec::with_fields(fields);
535 let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
536 let primary_key = codec.encode(row.into_iter()).unwrap();
537
538 let u64_field = BatchColumn {
539 column_id: 4,
540 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
541 };
542 let num_rows = u64_field.data.len();
543
544 Batch::new(
545 primary_key,
546 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
547 0, num_rows,
548 ))),
549 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
550 0, num_rows,
551 ))),
552 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
553 1, num_rows,
554 ))),
555 vec![u64_field],
556 )
557 .unwrap()
558 }
559
560 async fn build_applier_factory(
561 prefix: &str,
562 rows: BTreeSet<(&'static str, i32, [u64; 2])>,
563 ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
564 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
565 let table_dir = "table0".to_string();
566 let sst_file_id = FileId::random();
567 let object_store = mock_object_store();
568 let region_metadata = mock_region_metadata();
569 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
570 let memory_threshold = None;
571 let segment_row_count = 2;
572 let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
573
574 let mut creator = InvertedIndexer::new(
575 sst_file_id,
576 ®ion_metadata,
577 intm_mgr,
578 memory_threshold,
579 NonZeroUsize::new(segment_row_count).unwrap(),
580 indexed_column_ids.clone(),
581 );
582
583 for (str_tag, i32_tag, u64_field) in &rows {
584 let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
585 creator.update(&mut batch).await.unwrap();
586 }
587
588 let puffin_manager = factory.build(
589 object_store.clone(),
590 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
591 );
592
593 let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
594 let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
595 let (row_count, _) = creator.finish(&mut writer).await.unwrap();
596 assert_eq!(row_count, rows.len() * segment_row_count);
597 writer.finish().await.unwrap();
598
599 move |expr| {
600 let _d = &d;
601 let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
602 let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
603 let applier = InvertedIndexApplierBuilder::new(
604 table_dir.clone(),
605 PathType::Bare,
606 object_store.clone(),
607 ®ion_metadata,
608 indexed_column_ids.clone(),
609 factory.clone(),
610 )
611 .with_inverted_index_cache(Some(cache))
612 .with_puffin_metadata_cache(Some(puffin_metadata_cache))
613 .build(&[expr])
614 .unwrap()
615 .unwrap();
616 Box::pin(async move {
617 applier
618 .apply(sst_file_id, None)
619 .await
620 .unwrap()
621 .matched_segment_ids
622 .iter_ones()
623 .collect()
624 })
625 }
626 }
627
628 #[tokio::test]
629 async fn test_create_and_query_get_key() {
630 let rows = BTreeSet::from_iter([
631 ("aaa", 1, [1, 2]),
632 ("aaa", 2, [2, 3]),
633 ("aaa", 3, [3, 4]),
634 ("aab", 1, [4, 5]),
635 ("aab", 2, [5, 6]),
636 ("aab", 3, [6, 7]),
637 ("abc", 1, [7, 8]),
638 ("abc", 2, [8, 9]),
639 ("abc", 3, [9, 10]),
640 ]);
641
642 let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
643
644 let expr = col("tag_str").eq(lit("aaa"));
645 let res = applier_factory(expr).await;
646 assert_eq!(res, vec![0, 1, 2]);
647
648 let expr = col("tag_i32").eq(lit(2));
649 let res = applier_factory(expr).await;
650 assert_eq!(res, vec![1, 4, 7]);
651
652 let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
653 let res = applier_factory(expr).await;
654 assert_eq!(res, vec![1]);
655
656 let expr = col("tag_str")
657 .eq(lit("aaa"))
658 .or(col("tag_str").eq(lit("abc")));
659 let res = applier_factory(expr).await;
660 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
661
662 let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
663 let res = applier_factory(expr).await;
664 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
665
666 let expr = col("field_u64").eq(lit(2u64));
667 let res = applier_factory(expr).await;
668 assert_eq!(res, vec![0, 1]);
669 }
670
671 #[tokio::test]
672 async fn test_create_and_query_range() {
673 let rows = BTreeSet::from_iter([
674 ("aaa", 1, [1, 2]),
675 ("aaa", 2, [2, 3]),
676 ("aaa", 3, [3, 4]),
677 ("aab", 1, [4, 5]),
678 ("aab", 2, [5, 6]),
679 ("aab", 3, [6, 7]),
680 ("abc", 1, [7, 8]),
681 ("abc", 2, [8, 9]),
682 ("abc", 3, [9, 10]),
683 ]);
684
685 let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
686
687 let expr = col("tag_str").between(lit("aaa"), lit("aab"));
688 let res = applier_factory(expr).await;
689 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
690
691 let expr = col("tag_i32").between(lit(2), lit(3));
692 let res = applier_factory(expr).await;
693 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
694
695 let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
696 let res = applier_factory(expr).await;
697 assert_eq!(res, vec![0, 1, 2]);
698
699 let expr = col("tag_i32").between(lit(2), lit(2));
700 let res = applier_factory(expr).await;
701 assert_eq!(res, vec![1, 4, 7]);
702
703 let expr = col("field_u64").between(lit(2u64), lit(5u64));
704 let res = applier_factory(expr).await;
705 assert_eq!(res, vec![0, 1, 2, 3, 4]);
706 }
707
708 #[tokio::test]
709 async fn test_create_and_query_comparison() {
710 let rows = BTreeSet::from_iter([
711 ("aaa", 1, [1, 2]),
712 ("aaa", 2, [2, 3]),
713 ("aaa", 3, [3, 4]),
714 ("aab", 1, [4, 5]),
715 ("aab", 2, [5, 6]),
716 ("aab", 3, [6, 7]),
717 ("abc", 1, [7, 8]),
718 ("abc", 2, [8, 9]),
719 ("abc", 3, [9, 10]),
720 ]);
721
722 let applier_factory =
723 build_applier_factory("test_create_and_query_comparison_", rows).await;
724
725 let expr = col("tag_str").lt(lit("aab"));
726 let res = applier_factory(expr).await;
727 assert_eq!(res, vec![0, 1, 2]);
728
729 let expr = col("tag_i32").lt(lit(2));
730 let res = applier_factory(expr).await;
731 assert_eq!(res, vec![0, 3, 6]);
732
733 let expr = col("field_u64").lt(lit(2u64));
734 let res = applier_factory(expr).await;
735 assert_eq!(res, vec![0]);
736
737 let expr = col("tag_str").gt(lit("aab"));
738 let res = applier_factory(expr).await;
739 assert_eq!(res, vec![6, 7, 8]);
740
741 let expr = col("tag_i32").gt(lit(2));
742 let res = applier_factory(expr).await;
743 assert_eq!(res, vec![2, 5, 8]);
744
745 let expr = col("field_u64").gt(lit(8u64));
746 let res = applier_factory(expr).await;
747 assert_eq!(res, vec![7, 8]);
748
749 let expr = col("tag_str").lt_eq(lit("aab"));
750 let res = applier_factory(expr).await;
751 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
752
753 let expr = col("tag_i32").lt_eq(lit(2));
754 let res = applier_factory(expr).await;
755 assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
756
757 let expr = col("field_u64").lt_eq(lit(2u64));
758 let res = applier_factory(expr).await;
759 assert_eq!(res, vec![0, 1]);
760
761 let expr = col("tag_str").gt_eq(lit("aab"));
762 let res = applier_factory(expr).await;
763 assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
764
765 let expr = col("tag_i32").gt_eq(lit(2));
766 let res = applier_factory(expr).await;
767 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
768
769 let expr = col("field_u64").gt_eq(lit(8u64));
770 let res = applier_factory(expr).await;
771 assert_eq!(res, vec![6, 7, 8]);
772
773 let expr = col("tag_str")
774 .gt(lit("aaa"))
775 .and(col("tag_str").lt(lit("abc")));
776 let res = applier_factory(expr).await;
777 assert_eq!(res, vec![3, 4, 5]);
778
779 let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
780 let res = applier_factory(expr).await;
781 assert_eq!(res, vec![1, 4, 7]);
782
783 let expr = col("field_u64")
784 .gt(lit(2u64))
785 .and(col("field_u64").lt(lit(9u64)));
786 let res = applier_factory(expr).await;
787 assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
788 }
789
790 #[tokio::test]
791 async fn test_create_and_query_regex() {
792 let rows = BTreeSet::from_iter([
793 ("aaa", 1, [1, 2]),
794 ("aaa", 2, [2, 3]),
795 ("aaa", 3, [3, 4]),
796 ("aab", 1, [4, 5]),
797 ("aab", 2, [5, 6]),
798 ("aab", 3, [6, 7]),
799 ("abc", 1, [7, 8]),
800 ("abc", 2, [8, 9]),
801 ("abc", 3, [9, 10]),
802 ]);
803
804 let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
805
806 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
807 let res = applier_factory(expr).await;
808 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
809
810 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
811 let res = applier_factory(expr).await;
812 assert_eq!(res, vec![6, 7, 8]);
813
814 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
815 let res = applier_factory(expr).await;
816 assert_eq!(res, vec![3, 4, 5]);
817
818 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
819 let res = applier_factory(expr).await;
820 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
821
822 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
823 let res = applier_factory(expr).await;
824 assert!(res.is_empty());
825
826 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
827 let res = applier_factory(expr).await;
828 assert_eq!(res, vec![0, 1, 2]);
829 }
830}