mito2/sst/index/inverted_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use index::inverted_index::create::sort::external_sort::ExternalSorter;
22use index::inverted_index::create::sort_create::SortIndexCreator;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::format::writer::InvertedIndexBlobWriter;
25use puffin::puffin_manager::{PuffinWriter, PutOptions};
26use snafu::{ensure, ResultExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::ColumnId;
29use tokio::io::duplex;
30use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
31
32use crate::error::{
33    BiErrorsSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
34    PushIndexValueSnafu, Result,
35};
36use crate::read::Batch;
37use crate::row_converter::SortField;
38use crate::sst::file::FileId;
39use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
40use crate::sst::index::intermediate::{
41    IntermediateLocation, IntermediateManager, TempFileProvider,
42};
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::TYPE_INVERTED_INDEX;
47
48/// The minimum memory usage threshold for one column.
49const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
50
51/// The buffer size for the pipe used to send index data to the puffin blob.
52const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
53
54/// `InvertedIndexer` creates inverted index for SST files.
55pub struct InvertedIndexer {
56    /// The index creator.
57    index_creator: Box<dyn InvertedIndexCreator>,
58    /// The provider of intermediate files.
59    temp_file_provider: Arc<TempFileProvider>,
60
61    /// Codec for decoding primary keys.
62    codec: IndexValuesCodec,
63    /// Reusable buffer for encoding index values.
64    value_buf: Vec<u8>,
65
66    /// Statistics of index creation.
67    stats: Statistics,
68    /// Whether the index creation is aborted.
69    aborted: bool,
70
71    /// The memory usage of the index creator.
72    memory_usage: Arc<AtomicUsize>,
73
74    /// Ids of indexed columns and their names (`to_string` of the column id).
75    indexed_column_ids: Vec<(ColumnId, String)>,
76}
77
78impl InvertedIndexer {
79    /// Creates a new `InvertedIndexer`.
80    /// Should ensure that the number of tag columns is greater than 0.
81    pub fn new(
82        sst_file_id: FileId,
83        metadata: &RegionMetadataRef,
84        intermediate_manager: IntermediateManager,
85        memory_usage_threshold: Option<usize>,
86        segment_row_count: NonZeroUsize,
87        indexed_column_ids: HashSet<ColumnId>,
88    ) -> Self {
89        let temp_file_provider = Arc::new(TempFileProvider::new(
90            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
91            intermediate_manager,
92        ));
93
94        let memory_usage = Arc::new(AtomicUsize::new(0));
95
96        let sorter = ExternalSorter::factory(
97            temp_file_provider.clone() as _,
98            Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
99            memory_usage.clone(),
100            memory_usage_threshold,
101        );
102        let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
103
104        let codec = IndexValuesCodec::from_tag_columns(
105            metadata.primary_key_encoding,
106            metadata.primary_key_columns(),
107        );
108        let indexed_column_ids = indexed_column_ids
109            .into_iter()
110            .map(|col_id| {
111                let col_id_str = col_id.to_string();
112                (col_id, col_id_str)
113            })
114            .collect();
115        Self {
116            codec,
117            index_creator,
118            temp_file_provider,
119            value_buf: vec![],
120            stats: Statistics::new(TYPE_INVERTED_INDEX),
121            aborted: false,
122            memory_usage,
123            indexed_column_ids,
124        }
125    }
126
127    /// Updates index with a batch of rows.
128    /// Garbage will be cleaned up if failed to update.
129    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
130        ensure!(!self.aborted, OperateAbortedIndexSnafu);
131
132        if batch.is_empty() {
133            return Ok(());
134        }
135
136        if let Err(update_err) = self.do_update(batch).await {
137            // clean up garbage if failed to update
138            if let Err(err) = self.do_cleanup().await {
139                if cfg!(any(test, feature = "test")) {
140                    panic!("Failed to clean up index creator, err: {err}",);
141                } else {
142                    warn!(err; "Failed to clean up index creator");
143                }
144            }
145            return Err(update_err);
146        }
147
148        Ok(())
149    }
150
151    /// Finishes index creation and cleans up garbage.
152    /// Returns the number of rows and bytes written.
153    pub async fn finish(
154        &mut self,
155        puffin_writer: &mut SstPuffinWriter,
156    ) -> Result<(RowCount, ByteCount)> {
157        ensure!(!self.aborted, OperateAbortedIndexSnafu);
158
159        if self.stats.row_count() == 0 {
160            // no IO is performed, no garbage to clean up, just return
161            return Ok((0, 0));
162        }
163
164        let finish_res = self.do_finish(puffin_writer).await;
165        // clean up garbage no matter finish successfully or not
166        if let Err(err) = self.do_cleanup().await {
167            if cfg!(any(test, feature = "test")) {
168                panic!("Failed to clean up index creator, err: {err}",);
169            } else {
170                warn!(err; "Failed to clean up index creator");
171            }
172        }
173
174        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
175    }
176
177    /// Aborts index creation and clean up garbage.
178    pub async fn abort(&mut self) -> Result<()> {
179        if self.aborted {
180            return Ok(());
181        }
182        self.aborted = true;
183
184        self.do_cleanup().await
185    }
186
187    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
188        let mut guard = self.stats.record_update();
189
190        let n = batch.num_rows();
191        guard.inc_row_count(n);
192
193        for (col_id, col_id_str) in &self.indexed_column_ids {
194            match self.codec.pk_col_info(*col_id) {
195                // pk
196                Some(col_info) => {
197                    let pk_idx = col_info.idx;
198                    let field = &col_info.field;
199                    let value = batch
200                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
201                        .filter(|v| !v.is_null())
202                        .map(|v| {
203                            self.value_buf.clear();
204                            IndexValueCodec::encode_nonnull_value(
205                                v.as_value_ref(),
206                                field,
207                                &mut self.value_buf,
208                            )?;
209                            Ok(self.value_buf.as_slice())
210                        })
211                        .transpose()?;
212
213                    self.index_creator
214                        .push_with_name_n(col_id_str, value, n)
215                        .await
216                        .context(PushIndexValueSnafu)?;
217                }
218                // fields
219                None => {
220                    let Some(values) = batch.field_col_value(*col_id) else {
221                        debug!(
222                            "Column {} not found in the batch during building inverted index",
223                            col_id
224                        );
225                        continue;
226                    };
227                    let sort_field = SortField::new(values.data.data_type());
228                    for i in 0..n {
229                        self.value_buf.clear();
230                        let value = values.data.get_ref(i);
231                        if value.is_null() {
232                            self.index_creator
233                                .push_with_name(col_id_str, None)
234                                .await
235                                .context(PushIndexValueSnafu)?;
236                        } else {
237                            IndexValueCodec::encode_nonnull_value(
238                                value,
239                                &sort_field,
240                                &mut self.value_buf,
241                            )?;
242                            self.index_creator
243                                .push_with_name(col_id_str, Some(&self.value_buf))
244                                .await
245                                .context(PushIndexValueSnafu)?;
246                        }
247                    }
248                }
249            }
250        }
251
252        Ok(())
253    }
254
255    /// Data flow of finishing index:
256    ///
257    /// ```text
258    ///                               (In Memory Buffer)
259    ///                                    ┌──────┐
260    ///  ┌─────────────┐                   │ PIPE │
261    ///  │             │ write index data  │      │
262    ///  │ IndexWriter ├──────────────────►│ tx   │
263    ///  │             │                   │      │
264    ///  └─────────────┘                   │      │
265    ///                  ┌─────────────────┤ rx   │
266    ///  ┌─────────────┐ │ read as blob    └──────┘
267    ///  │             │ │
268    ///  │ PuffinWriter├─┤
269    ///  │             │ │ copy to file    ┌──────┐
270    ///  └─────────────┘ └────────────────►│ File │
271    ///                                    └──────┘
272    /// ```
273    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
274        let mut guard = self.stats.record_finish();
275
276        let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
277        let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
278
279        let (index_finish, puffin_add_blob) = futures::join!(
280            // TODO(zhongzc): config bitmap type
281            self.index_creator
282                .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
283            puffin_writer.put_blob(
284                INDEX_BLOB_TYPE,
285                rx.compat(),
286                PutOptions::default(),
287                Default::default(),
288            )
289        );
290
291        match (
292            puffin_add_blob.context(PuffinAddBlobSnafu),
293            index_finish.context(IndexFinishSnafu),
294        ) {
295            (Err(e1), Err(e2)) => BiErrorsSnafu {
296                first: Box::new(e1),
297                second: Box::new(e2),
298            }
299            .fail()?,
300
301            (Ok(_), e @ Err(_)) => e?,
302            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
303            (Ok(written_bytes), Ok(_)) => {
304                guard.inc_byte_count(written_bytes);
305            }
306        }
307
308        Ok(())
309    }
310
311    async fn do_cleanup(&mut self) -> Result<()> {
312        let _guard = self.stats.record_cleanup();
313
314        self.temp_file_provider.cleanup().await
315    }
316
317    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
318        self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
319    }
320
321    pub fn memory_usage(&self) -> usize {
322        self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::collections::BTreeSet;
329
330    use api::v1::SemanticType;
331    use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
332    use datatypes::data_type::ConcreteDataType;
333    use datatypes::schema::ColumnSchema;
334    use datatypes::value::ValueRef;
335    use datatypes::vectors::{UInt64Vector, UInt8Vector};
336    use futures::future::BoxFuture;
337    use object_store::services::Memory;
338    use object_store::ObjectStore;
339    use puffin::puffin_manager::cache::PuffinMetadataCache;
340    use puffin::puffin_manager::PuffinManager;
341    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
342    use store_api::storage::RegionId;
343
344    use super::*;
345    use crate::access_layer::RegionFilePathFactory;
346    use crate::cache::index::inverted_index::InvertedIndexCache;
347    use crate::metrics::CACHE_BYTES;
348    use crate::read::BatchColumn;
349    use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
350    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
351    use crate::sst::index::puffin_manager::PuffinManagerFactory;
352
353    fn mock_object_store() -> ObjectStore {
354        ObjectStore::new(Memory::default()).unwrap().finish()
355    }
356
357    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
358        IntermediateManager::init_fs(path).await.unwrap()
359    }
360
361    fn mock_region_metadata() -> RegionMetadataRef {
362        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
363        builder
364            .push_column_metadata(ColumnMetadata {
365                column_schema: ColumnSchema::new(
366                    "tag_str",
367                    ConcreteDataType::string_datatype(),
368                    false,
369                ),
370                semantic_type: SemanticType::Tag,
371                column_id: 1,
372            })
373            .push_column_metadata(ColumnMetadata {
374                column_schema: ColumnSchema::new(
375                    "tag_i32",
376                    ConcreteDataType::int32_datatype(),
377                    false,
378                ),
379                semantic_type: SemanticType::Tag,
380                column_id: 2,
381            })
382            .push_column_metadata(ColumnMetadata {
383                column_schema: ColumnSchema::new(
384                    "ts",
385                    ConcreteDataType::timestamp_millisecond_datatype(),
386                    false,
387                ),
388                semantic_type: SemanticType::Timestamp,
389                column_id: 3,
390            })
391            .push_column_metadata(ColumnMetadata {
392                column_schema: ColumnSchema::new(
393                    "field_u64",
394                    ConcreteDataType::uint64_datatype(),
395                    false,
396                ),
397                semantic_type: SemanticType::Field,
398                column_id: 4,
399            })
400            .primary_key(vec![1, 2]);
401
402        Arc::new(builder.build().unwrap())
403    }
404
405    fn new_batch(
406        str_tag: impl AsRef<str>,
407        i32_tag: impl Into<i32>,
408        u64_field: impl IntoIterator<Item = u64>,
409    ) -> Batch {
410        let fields = vec![
411            (0, SortField::new(ConcreteDataType::string_datatype())),
412            (1, SortField::new(ConcreteDataType::int32_datatype())),
413        ];
414        let codec = DensePrimaryKeyCodec::with_fields(fields);
415        let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
416        let primary_key = codec.encode(row.into_iter()).unwrap();
417
418        let u64_field = BatchColumn {
419            column_id: 4,
420            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
421        };
422        let num_rows = u64_field.data.len();
423
424        Batch::new(
425            primary_key,
426            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
427                0, num_rows,
428            ))),
429            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
430                0, num_rows,
431            ))),
432            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
433                1, num_rows,
434            ))),
435            vec![u64_field],
436        )
437        .unwrap()
438    }
439
440    async fn build_applier_factory(
441        prefix: &str,
442        rows: BTreeSet<(&'static str, i32, [u64; 2])>,
443    ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
444        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
445        let region_dir = "region0".to_string();
446        let sst_file_id = FileId::random();
447        let object_store = mock_object_store();
448        let region_metadata = mock_region_metadata();
449        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
450        let memory_threshold = None;
451        let segment_row_count = 2;
452        let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
453
454        let mut creator = InvertedIndexer::new(
455            sst_file_id,
456            &region_metadata,
457            intm_mgr,
458            memory_threshold,
459            NonZeroUsize::new(segment_row_count).unwrap(),
460            indexed_column_ids.clone(),
461        );
462
463        for (str_tag, i32_tag, u64_field) in &rows {
464            let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
465            creator.update(&mut batch).await.unwrap();
466        }
467
468        let puffin_manager = factory.build(
469            object_store.clone(),
470            RegionFilePathFactory::new(region_dir.clone()),
471        );
472        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
473        let (row_count, _) = creator.finish(&mut writer).await.unwrap();
474        assert_eq!(row_count, rows.len() * segment_row_count);
475        writer.finish().await.unwrap();
476
477        move |expr| {
478            let _d = &d;
479            let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
480            let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
481            let applier = InvertedIndexApplierBuilder::new(
482                region_dir.clone(),
483                object_store.clone(),
484                &region_metadata,
485                indexed_column_ids.clone(),
486                factory.clone(),
487            )
488            .with_inverted_index_cache(Some(cache))
489            .with_puffin_metadata_cache(Some(puffin_metadata_cache))
490            .build(&[expr])
491            .unwrap()
492            .unwrap();
493            Box::pin(async move {
494                applier
495                    .apply(sst_file_id, None)
496                    .await
497                    .unwrap()
498                    .matched_segment_ids
499                    .iter_ones()
500                    .collect()
501            })
502        }
503    }
504
505    #[tokio::test]
506    async fn test_create_and_query_get_key() {
507        let rows = BTreeSet::from_iter([
508            ("aaa", 1, [1, 2]),
509            ("aaa", 2, [2, 3]),
510            ("aaa", 3, [3, 4]),
511            ("aab", 1, [4, 5]),
512            ("aab", 2, [5, 6]),
513            ("aab", 3, [6, 7]),
514            ("abc", 1, [7, 8]),
515            ("abc", 2, [8, 9]),
516            ("abc", 3, [9, 10]),
517        ]);
518
519        let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
520
521        let expr = col("tag_str").eq(lit("aaa"));
522        let res = applier_factory(expr).await;
523        assert_eq!(res, vec![0, 1, 2]);
524
525        let expr = col("tag_i32").eq(lit(2));
526        let res = applier_factory(expr).await;
527        assert_eq!(res, vec![1, 4, 7]);
528
529        let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
530        let res = applier_factory(expr).await;
531        assert_eq!(res, vec![1]);
532
533        let expr = col("tag_str")
534            .eq(lit("aaa"))
535            .or(col("tag_str").eq(lit("abc")));
536        let res = applier_factory(expr).await;
537        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
538
539        let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
540        let res = applier_factory(expr).await;
541        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
542
543        let expr = col("field_u64").eq(lit(2u64));
544        let res = applier_factory(expr).await;
545        assert_eq!(res, vec![0, 1]);
546    }
547
548    #[tokio::test]
549    async fn test_create_and_query_range() {
550        let rows = BTreeSet::from_iter([
551            ("aaa", 1, [1, 2]),
552            ("aaa", 2, [2, 3]),
553            ("aaa", 3, [3, 4]),
554            ("aab", 1, [4, 5]),
555            ("aab", 2, [5, 6]),
556            ("aab", 3, [6, 7]),
557            ("abc", 1, [7, 8]),
558            ("abc", 2, [8, 9]),
559            ("abc", 3, [9, 10]),
560        ]);
561
562        let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
563
564        let expr = col("tag_str").between(lit("aaa"), lit("aab"));
565        let res = applier_factory(expr).await;
566        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
567
568        let expr = col("tag_i32").between(lit(2), lit(3));
569        let res = applier_factory(expr).await;
570        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
571
572        let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
573        let res = applier_factory(expr).await;
574        assert_eq!(res, vec![0, 1, 2]);
575
576        let expr = col("tag_i32").between(lit(2), lit(2));
577        let res = applier_factory(expr).await;
578        assert_eq!(res, vec![1, 4, 7]);
579
580        let expr = col("field_u64").between(lit(2u64), lit(5u64));
581        let res = applier_factory(expr).await;
582        assert_eq!(res, vec![0, 1, 2, 3, 4]);
583    }
584
585    #[tokio::test]
586    async fn test_create_and_query_comparison() {
587        let rows = BTreeSet::from_iter([
588            ("aaa", 1, [1, 2]),
589            ("aaa", 2, [2, 3]),
590            ("aaa", 3, [3, 4]),
591            ("aab", 1, [4, 5]),
592            ("aab", 2, [5, 6]),
593            ("aab", 3, [6, 7]),
594            ("abc", 1, [7, 8]),
595            ("abc", 2, [8, 9]),
596            ("abc", 3, [9, 10]),
597        ]);
598
599        let applier_factory =
600            build_applier_factory("test_create_and_query_comparison_", rows).await;
601
602        let expr = col("tag_str").lt(lit("aab"));
603        let res = applier_factory(expr).await;
604        assert_eq!(res, vec![0, 1, 2]);
605
606        let expr = col("tag_i32").lt(lit(2));
607        let res = applier_factory(expr).await;
608        assert_eq!(res, vec![0, 3, 6]);
609
610        let expr = col("field_u64").lt(lit(2u64));
611        let res = applier_factory(expr).await;
612        assert_eq!(res, vec![0]);
613
614        let expr = col("tag_str").gt(lit("aab"));
615        let res = applier_factory(expr).await;
616        assert_eq!(res, vec![6, 7, 8]);
617
618        let expr = col("tag_i32").gt(lit(2));
619        let res = applier_factory(expr).await;
620        assert_eq!(res, vec![2, 5, 8]);
621
622        let expr = col("field_u64").gt(lit(8u64));
623        let res = applier_factory(expr).await;
624        assert_eq!(res, vec![7, 8]);
625
626        let expr = col("tag_str").lt_eq(lit("aab"));
627        let res = applier_factory(expr).await;
628        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
629
630        let expr = col("tag_i32").lt_eq(lit(2));
631        let res = applier_factory(expr).await;
632        assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
633
634        let expr = col("field_u64").lt_eq(lit(2u64));
635        let res = applier_factory(expr).await;
636        assert_eq!(res, vec![0, 1]);
637
638        let expr = col("tag_str").gt_eq(lit("aab"));
639        let res = applier_factory(expr).await;
640        assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
641
642        let expr = col("tag_i32").gt_eq(lit(2));
643        let res = applier_factory(expr).await;
644        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
645
646        let expr = col("field_u64").gt_eq(lit(8u64));
647        let res = applier_factory(expr).await;
648        assert_eq!(res, vec![6, 7, 8]);
649
650        let expr = col("tag_str")
651            .gt(lit("aaa"))
652            .and(col("tag_str").lt(lit("abc")));
653        let res = applier_factory(expr).await;
654        assert_eq!(res, vec![3, 4, 5]);
655
656        let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
657        let res = applier_factory(expr).await;
658        assert_eq!(res, vec![1, 4, 7]);
659
660        let expr = col("field_u64")
661            .gt(lit(2u64))
662            .and(col("field_u64").lt(lit(9u64)));
663        let res = applier_factory(expr).await;
664        assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
665    }
666
667    #[tokio::test]
668    async fn test_create_and_query_regex() {
669        let rows = BTreeSet::from_iter([
670            ("aaa", 1, [1, 2]),
671            ("aaa", 2, [2, 3]),
672            ("aaa", 3, [3, 4]),
673            ("aab", 1, [4, 5]),
674            ("aab", 2, [5, 6]),
675            ("aab", 3, [6, 7]),
676            ("abc", 1, [7, 8]),
677            ("abc", 2, [8, 9]),
678            ("abc", 3, [9, 10]),
679        ]);
680
681        let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
682
683        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
684        let res = applier_factory(expr).await;
685        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
686
687        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
688        let res = applier_factory(expr).await;
689        assert_eq!(res, vec![6, 7, 8]);
690
691        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
692        let res = applier_factory(expr).await;
693        assert_eq!(res, vec![3, 4, 5]);
694
695        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
696        let res = applier_factory(expr).await;
697        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
698
699        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
700        let res = applier_factory(expr).await;
701        assert!(res.is_empty());
702
703        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
704        let res = applier_factory(expr).await;
705        assert_eq!(res, vec![0, 1, 2]);
706    }
707}