1use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::Arc;
18use std::sync::atomic::AtomicUsize;
19
20use common_telemetry::{debug, warn};
21use datatypes::arrow::record_batch::RecordBatch;
22use datatypes::vectors::Helper;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::create::sort::external_sort::ExternalSorter;
25use index::inverted_index::create::sort_create::SortIndexCreator;
26use index::inverted_index::format::writer::InvertedIndexBlobWriter;
27use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
28use mito_codec::row_converter::SortField;
29use puffin::puffin_manager::{PuffinWriter, PutOptions};
30use snafu::{ResultExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, FileId};
33use tokio::io::duplex;
34use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
35
36use crate::error::{
37 BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
38 PushIndexValueSnafu, Result,
39};
40use crate::read::Batch;
41use crate::sst::index::TYPE_INVERTED_INDEX;
42use crate::sst::index::intermediate::{
43 IntermediateLocation, IntermediateManager, TempFileProvider,
44};
45use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::SstPuffinWriter;
47use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
48
49const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
54
55pub struct InvertedIndexer {
57 index_creator: Box<dyn InvertedIndexCreator>,
59 temp_file_provider: Arc<TempFileProvider>,
61
62 codec: IndexValuesCodec,
64 value_buf: Vec<u8>,
66
67 stats: Statistics,
69 aborted: bool,
71
72 memory_usage: Arc<AtomicUsize>,
74
75 indexed_column_ids: Vec<(ColumnId, String)>,
77
78 metadata: RegionMetadataRef,
80 column_index_cache: Option<Vec<Option<usize>>>,
83}
84
85impl InvertedIndexer {
86 pub fn new(
89 sst_file_id: FileId,
90 metadata: &RegionMetadataRef,
91 intermediate_manager: IntermediateManager,
92 memory_usage_threshold: Option<usize>,
93 segment_row_count: NonZeroUsize,
94 indexed_column_ids: HashSet<ColumnId>,
95 ) -> Self {
96 let temp_file_provider = Arc::new(TempFileProvider::new(
97 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
98 intermediate_manager,
99 ));
100
101 let memory_usage = Arc::new(AtomicUsize::new(0));
102
103 let sorter = ExternalSorter::factory(
104 temp_file_provider.clone() as _,
105 Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
106 memory_usage.clone(),
107 memory_usage_threshold,
108 );
109 let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
110
111 let codec = IndexValuesCodec::from_tag_columns(
112 metadata.primary_key_encoding,
113 metadata.primary_key_columns(),
114 );
115 let indexed_column_ids = indexed_column_ids
116 .into_iter()
117 .map(|col_id| {
118 let col_id_str = col_id.to_string();
119 (col_id, col_id_str)
120 })
121 .collect();
122 Self {
123 codec,
124 index_creator,
125 temp_file_provider,
126 value_buf: vec![],
127 stats: Statistics::new(TYPE_INVERTED_INDEX),
128 aborted: false,
129 memory_usage,
130 indexed_column_ids,
131 metadata: metadata.clone(),
132 column_index_cache: None,
133 }
134 }
135
136 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
139 ensure!(!self.aborted, OperateAbortedIndexSnafu);
140
141 if batch.is_empty() {
142 return Ok(());
143 }
144
145 if let Err(update_err) = self.do_update(batch).await {
146 if let Err(err) = self.do_cleanup().await {
148 if cfg!(any(test, feature = "test")) {
149 panic!("Failed to clean up index creator, err: {err}",);
150 } else {
151 warn!(err; "Failed to clean up index creator");
152 }
153 }
154 return Err(update_err);
155 }
156
157 Ok(())
158 }
159
160 pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
162 ensure!(!self.aborted, OperateAbortedIndexSnafu);
163
164 if batch.num_rows() == 0 {
165 return Ok(());
166 }
167
168 self.do_update_flat(batch).await
169 }
170
171 async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
172 if self.column_index_cache.is_none() {
174 self.initialize_column_index_cache(batch);
175 }
176
177 let mut guard = self.stats.record_update();
178
179 let n = batch.num_rows();
180 guard.inc_row_count(n);
181
182 let column_indices = self.column_index_cache.as_ref().unwrap();
183
184 for ((col_id, col_id_str), &column_index) in
185 self.indexed_column_ids.iter().zip(column_indices.iter())
186 {
187 if let Some(index) = column_index {
188 let column_array = batch.column(index);
189 let vector = Helper::try_into_vector(column_array.clone())
191 .context(crate::error::ConvertVectorSnafu)?;
192 let sort_field = SortField::new(vector.data_type());
193
194 for row in 0..n {
195 self.value_buf.clear();
196 let value_ref = vector.get_ref(row);
197
198 if value_ref.is_null() {
199 self.index_creator
200 .push_with_name(col_id_str, None)
201 .await
202 .context(PushIndexValueSnafu)?;
203 } else {
204 IndexValueCodec::encode_nonnull_value(
205 value_ref,
206 &sort_field,
207 &mut self.value_buf,
208 )
209 .context(EncodeSnafu)?;
210 self.index_creator
211 .push_with_name(col_id_str, Some(&self.value_buf))
212 .await
213 .context(PushIndexValueSnafu)?;
214 }
215 }
216 } else {
217 debug!(
218 "Column {} not found in the batch during building inverted index",
219 col_id
220 );
221 }
222 }
223
224 Ok(())
225 }
226
227 fn initialize_column_index_cache(&mut self, batch: &RecordBatch) {
229 let mut column_indices = Vec::with_capacity(self.indexed_column_ids.len());
230
231 for (col_id, _) in &self.indexed_column_ids {
232 let column_index = if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
233 let column_name = &column_meta.column_schema.name;
234 batch
235 .schema()
236 .column_with_name(column_name)
237 .map(|(index, _)| index)
238 } else {
239 None
240 };
241 column_indices.push(column_index);
242 }
243
244 self.column_index_cache = Some(column_indices);
245 }
246
247 pub async fn finish(
250 &mut self,
251 puffin_writer: &mut SstPuffinWriter,
252 ) -> Result<(RowCount, ByteCount)> {
253 ensure!(!self.aborted, OperateAbortedIndexSnafu);
254
255 if self.stats.row_count() == 0 {
256 return Ok((0, 0));
258 }
259
260 let finish_res = self.do_finish(puffin_writer).await;
261 if let Err(err) = self.do_cleanup().await {
263 if cfg!(any(test, feature = "test")) {
264 panic!("Failed to clean up index creator, err: {err}",);
265 } else {
266 warn!(err; "Failed to clean up index creator");
267 }
268 }
269
270 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
271 }
272
273 pub async fn abort(&mut self) -> Result<()> {
275 if self.aborted {
276 return Ok(());
277 }
278 self.aborted = true;
279
280 self.do_cleanup().await
281 }
282
283 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
284 let mut guard = self.stats.record_update();
285
286 let n = batch.num_rows();
287 guard.inc_row_count(n);
288
289 for (col_id, col_id_str) in &self.indexed_column_ids {
290 match self.codec.pk_col_info(*col_id) {
291 Some(col_info) => {
293 let pk_idx = col_info.idx;
294 let field = &col_info.field;
295 let value = batch
296 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
297 .filter(|v| !v.is_null())
298 .map(|v| {
299 self.value_buf.clear();
300 IndexValueCodec::encode_nonnull_value(
301 v.as_value_ref(),
302 field,
303 &mut self.value_buf,
304 )
305 .context(EncodeSnafu)?;
306 Ok(self.value_buf.as_slice())
307 })
308 .transpose()?;
309
310 self.index_creator
311 .push_with_name_n(col_id_str, value, n)
312 .await
313 .context(PushIndexValueSnafu)?;
314 }
315 None => {
317 let Some(values) = batch.field_col_value(*col_id) else {
318 debug!(
319 "Column {} not found in the batch during building inverted index",
320 col_id
321 );
322 continue;
323 };
324 let sort_field = SortField::new(values.data.data_type());
325 for i in 0..n {
326 self.value_buf.clear();
327 let value = values.data.get_ref(i);
328 if value.is_null() {
329 self.index_creator
330 .push_with_name(col_id_str, None)
331 .await
332 .context(PushIndexValueSnafu)?;
333 } else {
334 IndexValueCodec::encode_nonnull_value(
335 value,
336 &sort_field,
337 &mut self.value_buf,
338 )
339 .context(EncodeSnafu)?;
340 self.index_creator
341 .push_with_name(col_id_str, Some(&self.value_buf))
342 .await
343 .context(PushIndexValueSnafu)?;
344 }
345 }
346 }
347 }
348 }
349
350 Ok(())
351 }
352
353 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
372 let mut guard = self.stats.record_finish();
373
374 let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
375 let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
376
377 let (index_finish, puffin_add_blob) = futures::join!(
378 self.index_creator
380 .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
381 puffin_writer.put_blob(
382 INDEX_BLOB_TYPE,
383 rx.compat(),
384 PutOptions::default(),
385 Default::default(),
386 )
387 );
388
389 match (
390 puffin_add_blob.context(PuffinAddBlobSnafu),
391 index_finish.context(IndexFinishSnafu),
392 ) {
393 (Err(e1), Err(e2)) => BiErrorsSnafu {
394 first: Box::new(e1),
395 second: Box::new(e2),
396 }
397 .fail()?,
398
399 (Ok(_), e @ Err(_)) => e?,
400 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
401 (Ok(written_bytes), Ok(_)) => {
402 guard.inc_byte_count(written_bytes);
403 }
404 }
405
406 Ok(())
407 }
408
409 async fn do_cleanup(&mut self) -> Result<()> {
410 let _guard = self.stats.record_cleanup();
411
412 self.temp_file_provider.cleanup().await
413 }
414
415 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
416 self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
417 }
418
419 pub fn memory_usage(&self) -> usize {
420 self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::collections::BTreeSet;
427
428 use api::v1::SemanticType;
429 use datafusion_expr::{Expr as DfExpr, Operator, binary_expr, col, lit};
430 use datatypes::data_type::ConcreteDataType;
431 use datatypes::schema::ColumnSchema;
432 use datatypes::value::ValueRef;
433 use datatypes::vectors::{UInt8Vector, UInt64Vector};
434 use futures::future::BoxFuture;
435 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
436 use object_store::ObjectStore;
437 use object_store::services::Memory;
438 use puffin::puffin_manager::PuffinManager;
439 use puffin::puffin_manager::cache::PuffinMetadataCache;
440 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
441 use store_api::region_request::PathType;
442 use store_api::storage::RegionId;
443
444 use super::*;
445 use crate::access_layer::RegionFilePathFactory;
446 use crate::cache::index::inverted_index::InvertedIndexCache;
447 use crate::metrics::CACHE_BYTES;
448 use crate::read::BatchColumn;
449 use crate::sst::file::RegionFileId;
450 use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
451 use crate::sst::index::puffin_manager::PuffinManagerFactory;
452
453 fn mock_object_store() -> ObjectStore {
454 ObjectStore::new(Memory::default()).unwrap().finish()
455 }
456
457 async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
458 IntermediateManager::init_fs(path).await.unwrap()
459 }
460
461 fn mock_region_metadata() -> RegionMetadataRef {
462 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
463 builder
464 .push_column_metadata(ColumnMetadata {
465 column_schema: ColumnSchema::new(
466 "tag_str",
467 ConcreteDataType::string_datatype(),
468 false,
469 ),
470 semantic_type: SemanticType::Tag,
471 column_id: 1,
472 })
473 .push_column_metadata(ColumnMetadata {
474 column_schema: ColumnSchema::new(
475 "tag_i32",
476 ConcreteDataType::int32_datatype(),
477 false,
478 ),
479 semantic_type: SemanticType::Tag,
480 column_id: 2,
481 })
482 .push_column_metadata(ColumnMetadata {
483 column_schema: ColumnSchema::new(
484 "ts",
485 ConcreteDataType::timestamp_millisecond_datatype(),
486 false,
487 ),
488 semantic_type: SemanticType::Timestamp,
489 column_id: 3,
490 })
491 .push_column_metadata(ColumnMetadata {
492 column_schema: ColumnSchema::new(
493 "field_u64",
494 ConcreteDataType::uint64_datatype(),
495 false,
496 ),
497 semantic_type: SemanticType::Field,
498 column_id: 4,
499 })
500 .primary_key(vec![1, 2]);
501
502 Arc::new(builder.build().unwrap())
503 }
504
505 fn new_batch(
506 str_tag: impl AsRef<str>,
507 i32_tag: impl Into<i32>,
508 u64_field: impl IntoIterator<Item = u64>,
509 ) -> Batch {
510 let fields = vec![
511 (0, SortField::new(ConcreteDataType::string_datatype())),
512 (1, SortField::new(ConcreteDataType::int32_datatype())),
513 ];
514 let codec = DensePrimaryKeyCodec::with_fields(fields);
515 let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
516 let primary_key = codec.encode(row.into_iter()).unwrap();
517
518 let u64_field = BatchColumn {
519 column_id: 4,
520 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
521 };
522 let num_rows = u64_field.data.len();
523
524 Batch::new(
525 primary_key,
526 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
527 0, num_rows,
528 ))),
529 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
530 0, num_rows,
531 ))),
532 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
533 1, num_rows,
534 ))),
535 vec![u64_field],
536 )
537 .unwrap()
538 }
539
540 async fn build_applier_factory(
541 prefix: &str,
542 rows: BTreeSet<(&'static str, i32, [u64; 2])>,
543 ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
544 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
545 let table_dir = "table0".to_string();
546 let sst_file_id = FileId::random();
547 let object_store = mock_object_store();
548 let region_metadata = mock_region_metadata();
549 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
550 let memory_threshold = None;
551 let segment_row_count = 2;
552 let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
553
554 let mut creator = InvertedIndexer::new(
555 sst_file_id,
556 ®ion_metadata,
557 intm_mgr,
558 memory_threshold,
559 NonZeroUsize::new(segment_row_count).unwrap(),
560 indexed_column_ids.clone(),
561 );
562
563 for (str_tag, i32_tag, u64_field) in &rows {
564 let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
565 creator.update(&mut batch).await.unwrap();
566 }
567
568 let puffin_manager = factory.build(
569 object_store.clone(),
570 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
571 );
572
573 let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
574 let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
575 let (row_count, _) = creator.finish(&mut writer).await.unwrap();
576 assert_eq!(row_count, rows.len() * segment_row_count);
577 writer.finish().await.unwrap();
578
579 move |expr| {
580 let _d = &d;
581 let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
582 let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
583 let applier = InvertedIndexApplierBuilder::new(
584 table_dir.clone(),
585 PathType::Bare,
586 object_store.clone(),
587 ®ion_metadata,
588 indexed_column_ids.clone(),
589 factory.clone(),
590 )
591 .with_inverted_index_cache(Some(cache))
592 .with_puffin_metadata_cache(Some(puffin_metadata_cache))
593 .build(&[expr])
594 .unwrap()
595 .unwrap();
596 Box::pin(async move {
597 applier
598 .apply(sst_file_id, None)
599 .await
600 .unwrap()
601 .matched_segment_ids
602 .iter_ones()
603 .collect()
604 })
605 }
606 }
607
608 #[tokio::test]
609 async fn test_create_and_query_get_key() {
610 let rows = BTreeSet::from_iter([
611 ("aaa", 1, [1, 2]),
612 ("aaa", 2, [2, 3]),
613 ("aaa", 3, [3, 4]),
614 ("aab", 1, [4, 5]),
615 ("aab", 2, [5, 6]),
616 ("aab", 3, [6, 7]),
617 ("abc", 1, [7, 8]),
618 ("abc", 2, [8, 9]),
619 ("abc", 3, [9, 10]),
620 ]);
621
622 let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
623
624 let expr = col("tag_str").eq(lit("aaa"));
625 let res = applier_factory(expr).await;
626 assert_eq!(res, vec![0, 1, 2]);
627
628 let expr = col("tag_i32").eq(lit(2));
629 let res = applier_factory(expr).await;
630 assert_eq!(res, vec![1, 4, 7]);
631
632 let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
633 let res = applier_factory(expr).await;
634 assert_eq!(res, vec![1]);
635
636 let expr = col("tag_str")
637 .eq(lit("aaa"))
638 .or(col("tag_str").eq(lit("abc")));
639 let res = applier_factory(expr).await;
640 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
641
642 let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
643 let res = applier_factory(expr).await;
644 assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
645
646 let expr = col("field_u64").eq(lit(2u64));
647 let res = applier_factory(expr).await;
648 assert_eq!(res, vec![0, 1]);
649 }
650
651 #[tokio::test]
652 async fn test_create_and_query_range() {
653 let rows = BTreeSet::from_iter([
654 ("aaa", 1, [1, 2]),
655 ("aaa", 2, [2, 3]),
656 ("aaa", 3, [3, 4]),
657 ("aab", 1, [4, 5]),
658 ("aab", 2, [5, 6]),
659 ("aab", 3, [6, 7]),
660 ("abc", 1, [7, 8]),
661 ("abc", 2, [8, 9]),
662 ("abc", 3, [9, 10]),
663 ]);
664
665 let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
666
667 let expr = col("tag_str").between(lit("aaa"), lit("aab"));
668 let res = applier_factory(expr).await;
669 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
670
671 let expr = col("tag_i32").between(lit(2), lit(3));
672 let res = applier_factory(expr).await;
673 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
674
675 let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
676 let res = applier_factory(expr).await;
677 assert_eq!(res, vec![0, 1, 2]);
678
679 let expr = col("tag_i32").between(lit(2), lit(2));
680 let res = applier_factory(expr).await;
681 assert_eq!(res, vec![1, 4, 7]);
682
683 let expr = col("field_u64").between(lit(2u64), lit(5u64));
684 let res = applier_factory(expr).await;
685 assert_eq!(res, vec![0, 1, 2, 3, 4]);
686 }
687
688 #[tokio::test]
689 async fn test_create_and_query_comparison() {
690 let rows = BTreeSet::from_iter([
691 ("aaa", 1, [1, 2]),
692 ("aaa", 2, [2, 3]),
693 ("aaa", 3, [3, 4]),
694 ("aab", 1, [4, 5]),
695 ("aab", 2, [5, 6]),
696 ("aab", 3, [6, 7]),
697 ("abc", 1, [7, 8]),
698 ("abc", 2, [8, 9]),
699 ("abc", 3, [9, 10]),
700 ]);
701
702 let applier_factory =
703 build_applier_factory("test_create_and_query_comparison_", rows).await;
704
705 let expr = col("tag_str").lt(lit("aab"));
706 let res = applier_factory(expr).await;
707 assert_eq!(res, vec![0, 1, 2]);
708
709 let expr = col("tag_i32").lt(lit(2));
710 let res = applier_factory(expr).await;
711 assert_eq!(res, vec![0, 3, 6]);
712
713 let expr = col("field_u64").lt(lit(2u64));
714 let res = applier_factory(expr).await;
715 assert_eq!(res, vec![0]);
716
717 let expr = col("tag_str").gt(lit("aab"));
718 let res = applier_factory(expr).await;
719 assert_eq!(res, vec![6, 7, 8]);
720
721 let expr = col("tag_i32").gt(lit(2));
722 let res = applier_factory(expr).await;
723 assert_eq!(res, vec![2, 5, 8]);
724
725 let expr = col("field_u64").gt(lit(8u64));
726 let res = applier_factory(expr).await;
727 assert_eq!(res, vec![7, 8]);
728
729 let expr = col("tag_str").lt_eq(lit("aab"));
730 let res = applier_factory(expr).await;
731 assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
732
733 let expr = col("tag_i32").lt_eq(lit(2));
734 let res = applier_factory(expr).await;
735 assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
736
737 let expr = col("field_u64").lt_eq(lit(2u64));
738 let res = applier_factory(expr).await;
739 assert_eq!(res, vec![0, 1]);
740
741 let expr = col("tag_str").gt_eq(lit("aab"));
742 let res = applier_factory(expr).await;
743 assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
744
745 let expr = col("tag_i32").gt_eq(lit(2));
746 let res = applier_factory(expr).await;
747 assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
748
749 let expr = col("field_u64").gt_eq(lit(8u64));
750 let res = applier_factory(expr).await;
751 assert_eq!(res, vec![6, 7, 8]);
752
753 let expr = col("tag_str")
754 .gt(lit("aaa"))
755 .and(col("tag_str").lt(lit("abc")));
756 let res = applier_factory(expr).await;
757 assert_eq!(res, vec![3, 4, 5]);
758
759 let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
760 let res = applier_factory(expr).await;
761 assert_eq!(res, vec![1, 4, 7]);
762
763 let expr = col("field_u64")
764 .gt(lit(2u64))
765 .and(col("field_u64").lt(lit(9u64)));
766 let res = applier_factory(expr).await;
767 assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
768 }
769
770 #[tokio::test]
771 async fn test_create_and_query_regex() {
772 let rows = BTreeSet::from_iter([
773 ("aaa", 1, [1, 2]),
774 ("aaa", 2, [2, 3]),
775 ("aaa", 3, [3, 4]),
776 ("aab", 1, [4, 5]),
777 ("aab", 2, [5, 6]),
778 ("aab", 3, [6, 7]),
779 ("abc", 1, [7, 8]),
780 ("abc", 2, [8, 9]),
781 ("abc", 3, [9, 10]),
782 ]);
783
784 let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
785
786 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
787 let res = applier_factory(expr).await;
788 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
789
790 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
791 let res = applier_factory(expr).await;
792 assert_eq!(res, vec![6, 7, 8]);
793
794 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
795 let res = applier_factory(expr).await;
796 assert_eq!(res, vec![3, 4, 5]);
797
798 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
799 let res = applier_factory(expr).await;
800 assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
801
802 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
803 let res = applier_factory(expr).await;
804 assert!(res.is_empty());
805
806 let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
807 let res = applier_factory(expr).await;
808 assert_eq!(res, vec![0, 1, 2]);
809 }
810}