1use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use datatypes::arrow::record_batch::RecordBatch;
22use datatypes::vectors::Helper;
23use index::inverted_index::create::sort::external_sort::ExternalSorter;
24use index::inverted_index::create::sort_create::SortIndexCreator;
25use index::inverted_index::create::InvertedIndexCreator;
26use index::inverted_index::format::writer::InvertedIndexBlobWriter;
27use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
28use mito_codec::row_converter::SortField;
29use puffin::puffin_manager::{PuffinWriter, PutOptions};
30use snafu::{ensure, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33use tokio::io::duplex;
34use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
35
36use crate::error::{
37 BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
38 PushIndexValueSnafu, Result,
39};
40use crate::read::Batch;
41use crate::sst::file::FileId;
42use crate::sst::index::intermediate::{
43 IntermediateLocation, IntermediateManager, TempFileProvider,
44};
45use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::SstPuffinWriter;
47use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
48use crate::sst::index::TYPE_INVERTED_INDEX;
49
50const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
55
56pub struct InvertedIndexer {
58 index_creator: Box<dyn InvertedIndexCreator>,
60 temp_file_provider: Arc<TempFileProvider>,
62
63 codec: IndexValuesCodec,
65 value_buf: Vec<u8>,
67
68 stats: Statistics,
70 aborted: bool,
72
73 memory_usage: Arc<AtomicUsize>,
75
76 indexed_column_ids: Vec<(ColumnId, String)>,
78
79 metadata: RegionMetadataRef,
81 column_index_cache: Option<Vec<Option<usize>>>,
84}
85
86impl InvertedIndexer {
87 pub fn new(
90 sst_file_id: FileId,
91 metadata: &RegionMetadataRef,
92 intermediate_manager: IntermediateManager,
93 memory_usage_threshold: Option<usize>,
94 segment_row_count: NonZeroUsize,
95 indexed_column_ids: HashSet<ColumnId>,
96 ) -> Self {
97 let temp_file_provider = Arc::new(TempFileProvider::new(
98 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
99 intermediate_manager,
100 ));
101
102 let memory_usage = Arc::new(AtomicUsize::new(0));
103
104 let sorter = ExternalSorter::factory(
105 temp_file_provider.clone() as _,
106 Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
107 memory_usage.clone(),
108 memory_usage_threshold,
109 );
110 let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
111
112 let codec = IndexValuesCodec::from_tag_columns(
113 metadata.primary_key_encoding,
114 metadata.primary_key_columns(),
115 );
116 let indexed_column_ids = indexed_column_ids
117 .into_iter()
118 .map(|col_id| {
119 let col_id_str = col_id.to_string();
120 (col_id, col_id_str)
121 })
122 .collect();
123 Self {
124 codec,
125 index_creator,
126 temp_file_provider,
127 value_buf: vec![],
128 stats: Statistics::new(TYPE_INVERTED_INDEX),
129 aborted: false,
130 memory_usage,
131 indexed_column_ids,
132 metadata: metadata.clone(),
133 column_index_cache: None,
134 }
135 }
136
137 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
140 ensure!(!self.aborted, OperateAbortedIndexSnafu);
141
142 if batch.is_empty() {
143 return Ok(());
144 }
145
146 if let Err(update_err) = self.do_update(batch).await {
147 if let Err(err) = self.do_cleanup().await {
149 if cfg!(any(test, feature = "test")) {
150 panic!("Failed to clean up index creator, err: {err}",);
151 } else {
152 warn!(err; "Failed to clean up index creator");
153 }
154 }
155 return Err(update_err);
156 }
157
158 Ok(())
159 }
160
161 pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
163 ensure!(!self.aborted, OperateAbortedIndexSnafu);
164
165 if batch.num_rows() == 0 {
166 return Ok(());
167 }
168
169 self.do_update_flat(batch).await
170 }
171
172 async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
173 if self.column_index_cache.is_none() {
175 self.initialize_column_index_cache(batch);
176 }
177
178 let mut guard = self.stats.record_update();
179
180 let n = batch.num_rows();
181 guard.inc_row_count(n);
182
183 let column_indices = self.column_index_cache.as_ref().unwrap();
184
185 for ((col_id, col_id_str), &column_index) in
186 self.indexed_column_ids.iter().zip(column_indices.iter())
187 {
188 if let Some(index) = column_index {
189 let column_array = batch.column(index);
190 let vector = Helper::try_into_vector(column_array.clone())
192 .context(crate::error::ConvertVectorSnafu)?;
193 let sort_field = SortField::new(vector.data_type());
194
195 for row in 0..n {
196 self.value_buf.clear();
197 let value_ref = vector.get_ref(row);
198
199 if value_ref.is_null() {
200 self.index_creator
201 .push_with_name(col_id_str, None)
202 .await
203 .context(PushIndexValueSnafu)?;
204 } else {
205 IndexValueCodec::encode_nonnull_value(
206 value_ref,
207 &sort_field,
208 &mut self.value_buf,
209 )
210 .context(EncodeSnafu)?;
211 self.index_creator
212 .push_with_name(col_id_str, Some(&self.value_buf))
213 .await
214 .context(PushIndexValueSnafu)?;
215 }
216 }
217 } else {
218 debug!(
219 "Column {} not found in the batch during building inverted index",
220 col_id
221 );
222 }
223 }
224
225 Ok(())
226 }
227
228 fn initialize_column_index_cache(&mut self, batch: &RecordBatch) {
230 let mut column_indices = Vec::with_capacity(self.indexed_column_ids.len());
231
232 for (col_id, _) in &self.indexed_column_ids {
233 let column_index = if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
234 let column_name = &column_meta.column_schema.name;
235 batch
236 .schema()
237 .column_with_name(column_name)
238 .map(|(index, _)| index)
239 } else {
240 None
241 };
242 column_indices.push(column_index);
243 }
244
245 self.column_index_cache = Some(column_indices);
246 }
247
248 pub async fn finish(
251 &mut self,
252 puffin_writer: &mut SstPuffinWriter,
253 ) -> Result<(RowCount, ByteCount)> {
254 ensure!(!self.aborted, OperateAbortedIndexSnafu);
255
256 if self.stats.row_count() == 0 {
257 return Ok((0, 0));
259 }
260
261 let finish_res = self.do_finish(puffin_writer).await;
262 if let Err(err) = self.do_cleanup().await {
264 if cfg!(any(test, feature = "test")) {
265 panic!("Failed to clean up index creator, err: {err}",);
266 } else {
267 warn!(err; "Failed to clean up index creator");
268 }
269 }
270
271 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
272 }
273
274 pub async fn abort(&mut self) -> Result<()> {
276 if self.aborted {
277 return Ok(());
278 }
279 self.aborted = true;
280
281 self.do_cleanup().await
282 }
283
284 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
285 let mut guard = self.stats.record_update();
286
287 let n = batch.num_rows();
288 guard.inc_row_count(n);
289
290 for (col_id, col_id_str) in &self.indexed_column_ids {
291 match self.codec.pk_col_info(*col_id) {
292 Some(col_info) => {
294 let pk_idx = col_info.idx;
295 let field = &col_info.field;
296 let value = batch
297 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
298 .filter(|v| !v.is_null())
299 .map(|v| {
300 self.value_buf.clear();
301 IndexValueCodec::encode_nonnull_value(
302 v.as_value_ref(),
303 field,
304 &mut self.value_buf,
305 )
306 .context(EncodeSnafu)?;
307 Ok(self.value_buf.as_slice())
308 })
309 .transpose()?;
310
311 self.index_creator
312 .push_with_name_n(col_id_str, value, n)
313 .await
314 .context(PushIndexValueSnafu)?;
315 }
316 None => {
318 let Some(values) = batch.field_col_value(*col_id) else {
319 debug!(
320 "Column {} not found in the batch during building inverted index",
321 col_id
322 );
323 continue;
324 };
325 let sort_field = SortField::new(values.data.data_type());
326 for i in 0..n {
327 self.value_buf.clear();
328 let value = values.data.get_ref(i);
329 if value.is_null() {
330 self.index_creator
331 .push_with_name(col_id_str, None)
332 .await
333 .context(PushIndexValueSnafu)?;
334 } else {
335 IndexValueCodec::encode_nonnull_value(
336 value,
337 &sort_field,
338 &mut self.value_buf,
339 )
340 .context(EncodeSnafu)?;
341 self.index_creator
342 .push_with_name(col_id_str, Some(&self.value_buf))
343 .await
344 .context(PushIndexValueSnafu)?;
345 }
346 }
347 }
348 }
349 }
350
351 Ok(())
352 }
353
354 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
373 let mut guard = self.stats.record_finish();
374
375 let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
376 let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
377
378 let (index_finish, puffin_add_blob) = futures::join!(
379 self.index_creator
381 .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
382 puffin_writer.put_blob(
383 INDEX_BLOB_TYPE,
384 rx.compat(),
385 PutOptions::default(),
386 Default::default(),
387 )
388 );
389
390 match (
391 puffin_add_blob.context(PuffinAddBlobSnafu),
392 index_finish.context(IndexFinishSnafu),
393 ) {
394 (Err(e1), Err(e2)) => BiErrorsSnafu {
395 first: Box::new(e1),
396 second: Box::new(e2),
397 }
398 .fail()?,
399
400 (Ok(_), e @ Err(_)) => e?,
401 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
402 (Ok(written_bytes), Ok(_)) => {
403 guard.inc_byte_count(written_bytes);
404 }
405 }
406
407 Ok(())
408 }
409
410 async fn do_cleanup(&mut self) -> Result<()> {
411 let _guard = self.stats.record_cleanup();
412
413 self.temp_file_provider.cleanup().await
414 }
415
416 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
417 self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
418 }
419
420 pub fn memory_usage(&self) -> usize {
421 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use std::collections::BTreeSet;
428
429 use api::v1::SemanticType;
430 use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
431 use datatypes::data_type::ConcreteDataType;
432 use datatypes::schema::ColumnSchema;
433 use datatypes::value::ValueRef;
434 use datatypes::vectors::{UInt64Vector, UInt8Vector};
435 use futures::future::BoxFuture;
436 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
437 use object_store::services::Memory;
438 use object_store::ObjectStore;
439 use puffin::puffin_manager::cache::PuffinMetadataCache;
440 use puffin::puffin_manager::PuffinManager;
441 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
442 use store_api::region_request::PathType;
443 use store_api::storage::RegionId;
444
445 use super::*;
446 use crate::access_layer::RegionFilePathFactory;
447 use crate::cache::index::inverted_index::InvertedIndexCache;
448 use crate::metrics::CACHE_BYTES;
449 use crate::read::BatchColumn;
450 use crate::sst::file::RegionFileId;
451 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
452 use crate::sst::index::puffin_manager::PuffinManagerFactory;
453
454 fn mock_object_store() -> ObjectStore {
455 ObjectStore::new(Memory::default()).unwrap().finish()
456 }
457
458 async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
459 IntermediateManager::init_fs(path).await.unwrap()
460 }
461
462 fn mock_region_metadata() -> RegionMetadataRef {
463 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
464 builder
465 .push_column_metadata(ColumnMetadata {
466 column_schema: ColumnSchema::new(
467 "tag_str",
468 ConcreteDataType::string_datatype(),
469 false,
470 ),
471 semantic_type: SemanticType::Tag,
472 column_id: 1,
473 })
474 .push_column_metadata(ColumnMetadata {
475 column_schema: ColumnSchema::new(
476 "tag_i32",
477 ConcreteDataType::int32_datatype(),
478 false,
479 ),
480 semantic_type: SemanticType::Tag,
481 column_id: 2,
482 })
483 .push_column_metadata(ColumnMetadata {
484 column_schema: ColumnSchema::new(
485 "ts",
486 ConcreteDataType::timestamp_millisecond_datatype(),
487 false,
488 ),
489 semantic_type: SemanticType::Timestamp,
490 column_id: 3,
491 })
492 .push_column_metadata(ColumnMetadata {
493 column_schema: ColumnSchema::new(
494 "field_u64",
495 ConcreteDataType::uint64_datatype(),
496 false,
497 ),
498 semantic_type: SemanticType::Field,
499 column_id: 4,
500 })
501 .primary_key(vec![1, 2]);
502
503 Arc::new(builder.build().unwrap())
504 }
505
506 fn new_batch(
507 str_tag: impl AsRef<str>,
508 i32_tag: impl Into<i32>,
509 u64_field: impl IntoIterator<Item = u64>,
510 ) -> Batch {
511 let fields = vec![
512 (0, SortField::new(ConcreteDataType::string_datatype())),
513 (1, SortField::new(ConcreteDataType::int32_datatype())),
514 ];
515 let codec = DensePrimaryKeyCodec::with_fields(fields);
516 let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
517 let primary_key = codec.encode(row.into_iter()).unwrap();
518
519 let u64_field = BatchColumn {
520 column_id: 4,
521 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
522 };
523 let num_rows = u64_field.data.len();
524
525 Batch::new(
526 primary_key,
527 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
528 0, num_rows,
529 ))),
530 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
531 0, num_rows,
532 ))),
533 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
534 1, num_rows,
535 ))),
536 vec![u64_field],
537 )
538 .unwrap()
539 }
540
541 async fn build_applier_factory(
542 prefix: &str,
543 rows: BTreeSet<(&'static str, i32, [u64; 2])>,
544 ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
545 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
546 let table_dir = "table0".to_string();
547 let sst_file_id = FileId::random();
548 let object_store = mock_object_store();
549 let region_metadata = mock_region_metadata();
550 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
551 let memory_threshold = None;
552 let segment_row_count = 2;
553 let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
554
555 let mut creator = InvertedIndexer::new(
556 sst_file_id,
557 ®ion_metadata,
558 intm_mgr,
559 memory_threshold,
560 NonZeroUsize::new(segment_row_count).unwrap(),
561 indexed_column_ids.clone(),
562 );
563
564 for (str_tag, i32_tag, u64_field) in &rows {
565 let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
566 creator.update(&mut batch).await.unwrap();
567 }
568
569 let puffin_manager = factory.build(
570 object_store.clone(),
571 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
572 );
573
574 let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
575 let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
576 let (row_count, _) = creator.finish(&mut writer).await.unwrap();
577 assert_eq!(row_count, rows.len() * segment_row_count);
578 writer.finish().await.unwrap();
579
580 move |expr| {
581 let _d = &d;
582 let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
583 let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
584 let applier = InvertedIndexApplierBuilder::new(
585 table_dir.clone(),
586 PathType::Bare,
587 object_store.clone(),
588 ®ion_metadata,
589 indexed_column_ids.clone(),
590 factory.clone(),
591 )
592 .with_inverted_index_cache(Some(cache))
593 .with_puffin_metadata_cache(Some(puffin_metadata_cache))
594 .build(&[expr])
595 .unwrap()
596 .unwrap();
597 Box::pin(async move {
598 applier
599 .apply(sst_file_id, None)
600 .await
601 .unwrap()
602 .matched_segment_ids
603 .iter_ones()
604 .collect()
605 })
606 }
607 }
608
609 #[tokio::test]
610 async fn test_create_and_query_get_key() {
611 let rows = BTreeSet::from_iter([
612 ("aaa", 1, [1, 2]),
613 ("aaa", 2, [2, 3]),
614 ("aaa", 3, [3, 4]),
615 ("aab", 1, [4, 5]),
616 ("aab", 2, [5, 6]),
617 ("aab", 3, [6, 7]),
618 ("abc", 1, [7, 8]),
619 ("abc", 2, [8, 9]),
620 ("abc", 3, [9, 10]),
621 ]);
622
623 let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
624
625 let expr = col("tag_str").eq(lit("aaa"));
626 let res = applier_factory(expr).await;
627 assert_eq!(res, vec![0, 1, 2]);
628
629 let expr = col("tag_i32").eq(lit(2));
630 let res = applier_factory(expr).await;
631 assert_eq!(res, vec![1, 4, 7]);
632
633 let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
634 let res = applier_factory(expr).await;
635 assert_eq!(res, vec![1]);
636
637 let expr = col("tag_str")
638 .eq(lit("aaa"))
639 .or(col("tag_str").eq(lit("abc")));
640 let res = applier_factory(expr).await;
641 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
642
643 let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
644 let res = applier_factory(expr).await;
645 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
646
647 let expr = col("field_u64").eq(lit(2u64));
648 let res = applier_factory(expr).await;
649 assert_eq!(res, vec![0, 1]);
650 }
651
652 #[tokio::test]
653 async fn test_create_and_query_range() {
654 let rows = BTreeSet::from_iter([
655 ("aaa", 1, [1, 2]),
656 ("aaa", 2, [2, 3]),
657 ("aaa", 3, [3, 4]),
658 ("aab", 1, [4, 5]),
659 ("aab", 2, [5, 6]),
660 ("aab", 3, [6, 7]),
661 ("abc", 1, [7, 8]),
662 ("abc", 2, [8, 9]),
663 ("abc", 3, [9, 10]),
664 ]);
665
666 let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
667
668 let expr = col("tag_str").between(lit("aaa"), lit("aab"));
669 let res = applier_factory(expr).await;
670 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
671
672 let expr = col("tag_i32").between(lit(2), lit(3));
673 let res = applier_factory(expr).await;
674 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
675
676 let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
677 let res = applier_factory(expr).await;
678 assert_eq!(res, vec![0, 1, 2]);
679
680 let expr = col("tag_i32").between(lit(2), lit(2));
681 let res = applier_factory(expr).await;
682 assert_eq!(res, vec![1, 4, 7]);
683
684 let expr = col("field_u64").between(lit(2u64), lit(5u64));
685 let res = applier_factory(expr).await;
686 assert_eq!(res, vec![0, 1, 2, 3, 4]);
687 }
688
689 #[tokio::test]
690 async fn test_create_and_query_comparison() {
691 let rows = BTreeSet::from_iter([
692 ("aaa", 1, [1, 2]),
693 ("aaa", 2, [2, 3]),
694 ("aaa", 3, [3, 4]),
695 ("aab", 1, [4, 5]),
696 ("aab", 2, [5, 6]),
697 ("aab", 3, [6, 7]),
698 ("abc", 1, [7, 8]),
699 ("abc", 2, [8, 9]),
700 ("abc", 3, [9, 10]),
701 ]);
702
703 let applier_factory =
704 build_applier_factory("test_create_and_query_comparison_", rows).await;
705
706 let expr = col("tag_str").lt(lit("aab"));
707 let res = applier_factory(expr).await;
708 assert_eq!(res, vec![0, 1, 2]);
709
710 let expr = col("tag_i32").lt(lit(2));
711 let res = applier_factory(expr).await;
712 assert_eq!(res, vec![0, 3, 6]);
713
714 let expr = col("field_u64").lt(lit(2u64));
715 let res = applier_factory(expr).await;
716 assert_eq!(res, vec![0]);
717
718 let expr = col("tag_str").gt(lit("aab"));
719 let res = applier_factory(expr).await;
720 assert_eq!(res, vec![6, 7, 8]);
721
722 let expr = col("tag_i32").gt(lit(2));
723 let res = applier_factory(expr).await;
724 assert_eq!(res, vec![2, 5, 8]);
725
726 let expr = col("field_u64").gt(lit(8u64));
727 let res = applier_factory(expr).await;
728 assert_eq!(res, vec![7, 8]);
729
730 let expr = col("tag_str").lt_eq(lit("aab"));
731 let res = applier_factory(expr).await;
732 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
733
734 let expr = col("tag_i32").lt_eq(lit(2));
735 let res = applier_factory(expr).await;
736 assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
737
738 let expr = col("field_u64").lt_eq(lit(2u64));
739 let res = applier_factory(expr).await;
740 assert_eq!(res, vec![0, 1]);
741
742 let expr = col("tag_str").gt_eq(lit("aab"));
743 let res = applier_factory(expr).await;
744 assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
745
746 let expr = col("tag_i32").gt_eq(lit(2));
747 let res = applier_factory(expr).await;
748 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
749
750 let expr = col("field_u64").gt_eq(lit(8u64));
751 let res = applier_factory(expr).await;
752 assert_eq!(res, vec![6, 7, 8]);
753
754 let expr = col("tag_str")
755 .gt(lit("aaa"))
756 .and(col("tag_str").lt(lit("abc")));
757 let res = applier_factory(expr).await;
758 assert_eq!(res, vec![3, 4, 5]);
759
760 let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
761 let res = applier_factory(expr).await;
762 assert_eq!(res, vec![1, 4, 7]);
763
764 let expr = col("field_u64")
765 .gt(lit(2u64))
766 .and(col("field_u64").lt(lit(9u64)));
767 let res = applier_factory(expr).await;
768 assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
769 }
770
771 #[tokio::test]
772 async fn test_create_and_query_regex() {
773 let rows = BTreeSet::from_iter([
774 ("aaa", 1, [1, 2]),
775 ("aaa", 2, [2, 3]),
776 ("aaa", 3, [3, 4]),
777 ("aab", 1, [4, 5]),
778 ("aab", 2, [5, 6]),
779 ("aab", 3, [6, 7]),
780 ("abc", 1, [7, 8]),
781 ("abc", 2, [8, 9]),
782 ("abc", 3, [9, 10]),
783 ]);
784
785 let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
786
787 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
788 let res = applier_factory(expr).await;
789 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
790
791 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
792 let res = applier_factory(expr).await;
793 assert_eq!(res, vec![6, 7, 8]);
794
795 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
796 let res = applier_factory(expr).await;
797 assert_eq!(res, vec![3, 4, 5]);
798
799 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
800 let res = applier_factory(expr).await;
801 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
802
803 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
804 let res = applier_factory(expr).await;
805 assert!(res.is_empty());
806
807 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
808 let res = applier_factory(expr).await;
809 assert_eq!(res, vec![0, 1, 2]);
810 }
811}