mito2/sst/index/inverted_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use index::inverted_index::create::sort::external_sort::ExternalSorter;
22use index::inverted_index::create::sort_create::SortIndexCreator;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::format::writer::InvertedIndexBlobWriter;
25use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
26use mito_codec::row_converter::SortField;
27use puffin::puffin_manager::{PuffinWriter, PutOptions};
28use snafu::{ensure, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31use tokio::io::duplex;
32use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
33
34use crate::error::{
35    BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
36    PushIndexValueSnafu, Result,
37};
38use crate::read::Batch;
39use crate::sst::file::FileId;
40use crate::sst::index::intermediate::{
41    IntermediateLocation, IntermediateManager, TempFileProvider,
42};
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::TYPE_INVERTED_INDEX;
47
48/// The minimum memory usage threshold for one column.
49const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
50
51/// The buffer size for the pipe used to send index data to the puffin blob.
52const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
53
54/// `InvertedIndexer` creates inverted index for SST files.
55pub struct InvertedIndexer {
56    /// The index creator.
57    index_creator: Box<dyn InvertedIndexCreator>,
58    /// The provider of intermediate files.
59    temp_file_provider: Arc<TempFileProvider>,
60
61    /// Codec for decoding primary keys.
62    codec: IndexValuesCodec,
63    /// Reusable buffer for encoding index values.
64    value_buf: Vec<u8>,
65
66    /// Statistics of index creation.
67    stats: Statistics,
68    /// Whether the index creation is aborted.
69    aborted: bool,
70
71    /// The memory usage of the index creator.
72    memory_usage: Arc<AtomicUsize>,
73
74    /// Ids of indexed columns and their names (`to_string` of the column id).
75    indexed_column_ids: Vec<(ColumnId, String)>,
76}
77
78impl InvertedIndexer {
79    /// Creates a new `InvertedIndexer`.
80    /// Should ensure that the number of tag columns is greater than 0.
81    pub fn new(
82        sst_file_id: FileId,
83        metadata: &RegionMetadataRef,
84        intermediate_manager: IntermediateManager,
85        memory_usage_threshold: Option<usize>,
86        segment_row_count: NonZeroUsize,
87        indexed_column_ids: HashSet<ColumnId>,
88    ) -> Self {
89        let temp_file_provider = Arc::new(TempFileProvider::new(
90            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
91            intermediate_manager,
92        ));
93
94        let memory_usage = Arc::new(AtomicUsize::new(0));
95
96        let sorter = ExternalSorter::factory(
97            temp_file_provider.clone() as _,
98            Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
99            memory_usage.clone(),
100            memory_usage_threshold,
101        );
102        let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
103
104        let codec = IndexValuesCodec::from_tag_columns(
105            metadata.primary_key_encoding,
106            metadata.primary_key_columns(),
107        );
108        let indexed_column_ids = indexed_column_ids
109            .into_iter()
110            .map(|col_id| {
111                let col_id_str = col_id.to_string();
112                (col_id, col_id_str)
113            })
114            .collect();
115        Self {
116            codec,
117            index_creator,
118            temp_file_provider,
119            value_buf: vec![],
120            stats: Statistics::new(TYPE_INVERTED_INDEX),
121            aborted: false,
122            memory_usage,
123            indexed_column_ids,
124        }
125    }
126
127    /// Updates index with a batch of rows.
128    /// Garbage will be cleaned up if failed to update.
129    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
130        ensure!(!self.aborted, OperateAbortedIndexSnafu);
131
132        if batch.is_empty() {
133            return Ok(());
134        }
135
136        if let Err(update_err) = self.do_update(batch).await {
137            // clean up garbage if failed to update
138            if let Err(err) = self.do_cleanup().await {
139                if cfg!(any(test, feature = "test")) {
140                    panic!("Failed to clean up index creator, err: {err}",);
141                } else {
142                    warn!(err; "Failed to clean up index creator");
143                }
144            }
145            return Err(update_err);
146        }
147
148        Ok(())
149    }
150
151    /// Finishes index creation and cleans up garbage.
152    /// Returns the number of rows and bytes written.
153    pub async fn finish(
154        &mut self,
155        puffin_writer: &mut SstPuffinWriter,
156    ) -> Result<(RowCount, ByteCount)> {
157        ensure!(!self.aborted, OperateAbortedIndexSnafu);
158
159        if self.stats.row_count() == 0 {
160            // no IO is performed, no garbage to clean up, just return
161            return Ok((0, 0));
162        }
163
164        let finish_res = self.do_finish(puffin_writer).await;
165        // clean up garbage no matter finish successfully or not
166        if let Err(err) = self.do_cleanup().await {
167            if cfg!(any(test, feature = "test")) {
168                panic!("Failed to clean up index creator, err: {err}",);
169            } else {
170                warn!(err; "Failed to clean up index creator");
171            }
172        }
173
174        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
175    }
176
177    /// Aborts index creation and clean up garbage.
178    pub async fn abort(&mut self) -> Result<()> {
179        if self.aborted {
180            return Ok(());
181        }
182        self.aborted = true;
183
184        self.do_cleanup().await
185    }
186
187    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
188        let mut guard = self.stats.record_update();
189
190        let n = batch.num_rows();
191        guard.inc_row_count(n);
192
193        for (col_id, col_id_str) in &self.indexed_column_ids {
194            match self.codec.pk_col_info(*col_id) {
195                // pk
196                Some(col_info) => {
197                    let pk_idx = col_info.idx;
198                    let field = &col_info.field;
199                    let value = batch
200                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
201                        .filter(|v| !v.is_null())
202                        .map(|v| {
203                            self.value_buf.clear();
204                            IndexValueCodec::encode_nonnull_value(
205                                v.as_value_ref(),
206                                field,
207                                &mut self.value_buf,
208                            )
209                            .context(EncodeSnafu)?;
210                            Ok(self.value_buf.as_slice())
211                        })
212                        .transpose()?;
213
214                    self.index_creator
215                        .push_with_name_n(col_id_str, value, n)
216                        .await
217                        .context(PushIndexValueSnafu)?;
218                }
219                // fields
220                None => {
221                    let Some(values) = batch.field_col_value(*col_id) else {
222                        debug!(
223                            "Column {} not found in the batch during building inverted index",
224                            col_id
225                        );
226                        continue;
227                    };
228                    let sort_field = SortField::new(values.data.data_type());
229                    for i in 0..n {
230                        self.value_buf.clear();
231                        let value = values.data.get_ref(i);
232                        if value.is_null() {
233                            self.index_creator
234                                .push_with_name(col_id_str, None)
235                                .await
236                                .context(PushIndexValueSnafu)?;
237                        } else {
238                            IndexValueCodec::encode_nonnull_value(
239                                value,
240                                &sort_field,
241                                &mut self.value_buf,
242                            )
243                            .context(EncodeSnafu)?;
244                            self.index_creator
245                                .push_with_name(col_id_str, Some(&self.value_buf))
246                                .await
247                                .context(PushIndexValueSnafu)?;
248                        }
249                    }
250                }
251            }
252        }
253
254        Ok(())
255    }
256
257    /// Data flow of finishing index:
258    ///
259    /// ```text
260    ///                               (In Memory Buffer)
261    ///                                    ┌──────┐
262    ///  ┌─────────────┐                   │ PIPE │
263    ///  │             │ write index data  │      │
264    ///  │ IndexWriter ├──────────────────►│ tx   │
265    ///  │             │                   │      │
266    ///  └─────────────┘                   │      │
267    ///                  ┌─────────────────┤ rx   │
268    ///  ┌─────────────┐ │ read as blob    └──────┘
269    ///  │             │ │
270    ///  │ PuffinWriter├─┤
271    ///  │             │ │ copy to file    ┌──────┐
272    ///  └─────────────┘ └────────────────►│ File │
273    ///                                    └──────┘
274    /// ```
275    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
276        let mut guard = self.stats.record_finish();
277
278        let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
279        let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
280
281        let (index_finish, puffin_add_blob) = futures::join!(
282            // TODO(zhongzc): config bitmap type
283            self.index_creator
284                .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
285            puffin_writer.put_blob(
286                INDEX_BLOB_TYPE,
287                rx.compat(),
288                PutOptions::default(),
289                Default::default(),
290            )
291        );
292
293        match (
294            puffin_add_blob.context(PuffinAddBlobSnafu),
295            index_finish.context(IndexFinishSnafu),
296        ) {
297            (Err(e1), Err(e2)) => BiErrorsSnafu {
298                first: Box::new(e1),
299                second: Box::new(e2),
300            }
301            .fail()?,
302
303            (Ok(_), e @ Err(_)) => e?,
304            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
305            (Ok(written_bytes), Ok(_)) => {
306                guard.inc_byte_count(written_bytes);
307            }
308        }
309
310        Ok(())
311    }
312
313    async fn do_cleanup(&mut self) -> Result<()> {
314        let _guard = self.stats.record_cleanup();
315
316        self.temp_file_provider.cleanup().await
317    }
318
319    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
320        self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
321    }
322
323    pub fn memory_usage(&self) -> usize {
324        self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use std::collections::BTreeSet;
331
332    use api::v1::SemanticType;
333    use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
334    use datatypes::data_type::ConcreteDataType;
335    use datatypes::schema::ColumnSchema;
336    use datatypes::value::ValueRef;
337    use datatypes::vectors::{UInt64Vector, UInt8Vector};
338    use futures::future::BoxFuture;
339    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
340    use object_store::services::Memory;
341    use object_store::ObjectStore;
342    use puffin::puffin_manager::cache::PuffinMetadataCache;
343    use puffin::puffin_manager::PuffinManager;
344    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
345    use store_api::storage::RegionId;
346
347    use super::*;
348    use crate::access_layer::RegionFilePathFactory;
349    use crate::cache::index::inverted_index::InvertedIndexCache;
350    use crate::metrics::CACHE_BYTES;
351    use crate::read::BatchColumn;
352    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
353    use crate::sst::index::puffin_manager::PuffinManagerFactory;
354
355    fn mock_object_store() -> ObjectStore {
356        ObjectStore::new(Memory::default()).unwrap().finish()
357    }
358
359    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
360        IntermediateManager::init_fs(path).await.unwrap()
361    }
362
363    fn mock_region_metadata() -> RegionMetadataRef {
364        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
365        builder
366            .push_column_metadata(ColumnMetadata {
367                column_schema: ColumnSchema::new(
368                    "tag_str",
369                    ConcreteDataType::string_datatype(),
370                    false,
371                ),
372                semantic_type: SemanticType::Tag,
373                column_id: 1,
374            })
375            .push_column_metadata(ColumnMetadata {
376                column_schema: ColumnSchema::new(
377                    "tag_i32",
378                    ConcreteDataType::int32_datatype(),
379                    false,
380                ),
381                semantic_type: SemanticType::Tag,
382                column_id: 2,
383            })
384            .push_column_metadata(ColumnMetadata {
385                column_schema: ColumnSchema::new(
386                    "ts",
387                    ConcreteDataType::timestamp_millisecond_datatype(),
388                    false,
389                ),
390                semantic_type: SemanticType::Timestamp,
391                column_id: 3,
392            })
393            .push_column_metadata(ColumnMetadata {
394                column_schema: ColumnSchema::new(
395                    "field_u64",
396                    ConcreteDataType::uint64_datatype(),
397                    false,
398                ),
399                semantic_type: SemanticType::Field,
400                column_id: 4,
401            })
402            .primary_key(vec![1, 2]);
403
404        Arc::new(builder.build().unwrap())
405    }
406
407    fn new_batch(
408        str_tag: impl AsRef<str>,
409        i32_tag: impl Into<i32>,
410        u64_field: impl IntoIterator<Item = u64>,
411    ) -> Batch {
412        let fields = vec![
413            (0, SortField::new(ConcreteDataType::string_datatype())),
414            (1, SortField::new(ConcreteDataType::int32_datatype())),
415        ];
416        let codec = DensePrimaryKeyCodec::with_fields(fields);
417        let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
418        let primary_key = codec.encode(row.into_iter()).unwrap();
419
420        let u64_field = BatchColumn {
421            column_id: 4,
422            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
423        };
424        let num_rows = u64_field.data.len();
425
426        Batch::new(
427            primary_key,
428            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
429                0, num_rows,
430            ))),
431            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
432                0, num_rows,
433            ))),
434            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
435                1, num_rows,
436            ))),
437            vec![u64_field],
438        )
439        .unwrap()
440    }
441
442    async fn build_applier_factory(
443        prefix: &str,
444        rows: BTreeSet<(&'static str, i32, [u64; 2])>,
445    ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
446        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
447        let region_dir = "region0".to_string();
448        let sst_file_id = FileId::random();
449        let object_store = mock_object_store();
450        let region_metadata = mock_region_metadata();
451        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
452        let memory_threshold = None;
453        let segment_row_count = 2;
454        let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
455
456        let mut creator = InvertedIndexer::new(
457            sst_file_id,
458            &region_metadata,
459            intm_mgr,
460            memory_threshold,
461            NonZeroUsize::new(segment_row_count).unwrap(),
462            indexed_column_ids.clone(),
463        );
464
465        for (str_tag, i32_tag, u64_field) in &rows {
466            let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
467            creator.update(&mut batch).await.unwrap();
468        }
469
470        let puffin_manager = factory.build(
471            object_store.clone(),
472            RegionFilePathFactory::new(region_dir.clone()),
473        );
474        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
475        let (row_count, _) = creator.finish(&mut writer).await.unwrap();
476        assert_eq!(row_count, rows.len() * segment_row_count);
477        writer.finish().await.unwrap();
478
479        move |expr| {
480            let _d = &d;
481            let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
482            let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
483            let applier = InvertedIndexApplierBuilder::new(
484                region_dir.clone(),
485                object_store.clone(),
486                &region_metadata,
487                indexed_column_ids.clone(),
488                factory.clone(),
489            )
490            .with_inverted_index_cache(Some(cache))
491            .with_puffin_metadata_cache(Some(puffin_metadata_cache))
492            .build(&[expr])
493            .unwrap()
494            .unwrap();
495            Box::pin(async move {
496                applier
497                    .apply(sst_file_id, None)
498                    .await
499                    .unwrap()
500                    .matched_segment_ids
501                    .iter_ones()
502                    .collect()
503            })
504        }
505    }
506
507    #[tokio::test]
508    async fn test_create_and_query_get_key() {
509        let rows = BTreeSet::from_iter([
510            ("aaa", 1, [1, 2]),
511            ("aaa", 2, [2, 3]),
512            ("aaa", 3, [3, 4]),
513            ("aab", 1, [4, 5]),
514            ("aab", 2, [5, 6]),
515            ("aab", 3, [6, 7]),
516            ("abc", 1, [7, 8]),
517            ("abc", 2, [8, 9]),
518            ("abc", 3, [9, 10]),
519        ]);
520
521        let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
522
523        let expr = col("tag_str").eq(lit("aaa"));
524        let res = applier_factory(expr).await;
525        assert_eq!(res, vec![0, 1, 2]);
526
527        let expr = col("tag_i32").eq(lit(2));
528        let res = applier_factory(expr).await;
529        assert_eq!(res, vec![1, 4, 7]);
530
531        let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
532        let res = applier_factory(expr).await;
533        assert_eq!(res, vec![1]);
534
535        let expr = col("tag_str")
536            .eq(lit("aaa"))
537            .or(col("tag_str").eq(lit("abc")));
538        let res = applier_factory(expr).await;
539        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
540
541        let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
542        let res = applier_factory(expr).await;
543        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
544
545        let expr = col("field_u64").eq(lit(2u64));
546        let res = applier_factory(expr).await;
547        assert_eq!(res, vec![0, 1]);
548    }
549
550    #[tokio::test]
551    async fn test_create_and_query_range() {
552        let rows = BTreeSet::from_iter([
553            ("aaa", 1, [1, 2]),
554            ("aaa", 2, [2, 3]),
555            ("aaa", 3, [3, 4]),
556            ("aab", 1, [4, 5]),
557            ("aab", 2, [5, 6]),
558            ("aab", 3, [6, 7]),
559            ("abc", 1, [7, 8]),
560            ("abc", 2, [8, 9]),
561            ("abc", 3, [9, 10]),
562        ]);
563
564        let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
565
566        let expr = col("tag_str").between(lit("aaa"), lit("aab"));
567        let res = applier_factory(expr).await;
568        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
569
570        let expr = col("tag_i32").between(lit(2), lit(3));
571        let res = applier_factory(expr).await;
572        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
573
574        let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
575        let res = applier_factory(expr).await;
576        assert_eq!(res, vec![0, 1, 2]);
577
578        let expr = col("tag_i32").between(lit(2), lit(2));
579        let res = applier_factory(expr).await;
580        assert_eq!(res, vec![1, 4, 7]);
581
582        let expr = col("field_u64").between(lit(2u64), lit(5u64));
583        let res = applier_factory(expr).await;
584        assert_eq!(res, vec![0, 1, 2, 3, 4]);
585    }
586
587    #[tokio::test]
588    async fn test_create_and_query_comparison() {
589        let rows = BTreeSet::from_iter([
590            ("aaa", 1, [1, 2]),
591            ("aaa", 2, [2, 3]),
592            ("aaa", 3, [3, 4]),
593            ("aab", 1, [4, 5]),
594            ("aab", 2, [5, 6]),
595            ("aab", 3, [6, 7]),
596            ("abc", 1, [7, 8]),
597            ("abc", 2, [8, 9]),
598            ("abc", 3, [9, 10]),
599        ]);
600
601        let applier_factory =
602            build_applier_factory("test_create_and_query_comparison_", rows).await;
603
604        let expr = col("tag_str").lt(lit("aab"));
605        let res = applier_factory(expr).await;
606        assert_eq!(res, vec![0, 1, 2]);
607
608        let expr = col("tag_i32").lt(lit(2));
609        let res = applier_factory(expr).await;
610        assert_eq!(res, vec![0, 3, 6]);
611
612        let expr = col("field_u64").lt(lit(2u64));
613        let res = applier_factory(expr).await;
614        assert_eq!(res, vec![0]);
615
616        let expr = col("tag_str").gt(lit("aab"));
617        let res = applier_factory(expr).await;
618        assert_eq!(res, vec![6, 7, 8]);
619
620        let expr = col("tag_i32").gt(lit(2));
621        let res = applier_factory(expr).await;
622        assert_eq!(res, vec![2, 5, 8]);
623
624        let expr = col("field_u64").gt(lit(8u64));
625        let res = applier_factory(expr).await;
626        assert_eq!(res, vec![7, 8]);
627
628        let expr = col("tag_str").lt_eq(lit("aab"));
629        let res = applier_factory(expr).await;
630        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
631
632        let expr = col("tag_i32").lt_eq(lit(2));
633        let res = applier_factory(expr).await;
634        assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
635
636        let expr = col("field_u64").lt_eq(lit(2u64));
637        let res = applier_factory(expr).await;
638        assert_eq!(res, vec![0, 1]);
639
640        let expr = col("tag_str").gt_eq(lit("aab"));
641        let res = applier_factory(expr).await;
642        assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
643
644        let expr = col("tag_i32").gt_eq(lit(2));
645        let res = applier_factory(expr).await;
646        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
647
648        let expr = col("field_u64").gt_eq(lit(8u64));
649        let res = applier_factory(expr).await;
650        assert_eq!(res, vec![6, 7, 8]);
651
652        let expr = col("tag_str")
653            .gt(lit("aaa"))
654            .and(col("tag_str").lt(lit("abc")));
655        let res = applier_factory(expr).await;
656        assert_eq!(res, vec![3, 4, 5]);
657
658        let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
659        let res = applier_factory(expr).await;
660        assert_eq!(res, vec![1, 4, 7]);
661
662        let expr = col("field_u64")
663            .gt(lit(2u64))
664            .and(col("field_u64").lt(lit(9u64)));
665        let res = applier_factory(expr).await;
666        assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
667    }
668
669    #[tokio::test]
670    async fn test_create_and_query_regex() {
671        let rows = BTreeSet::from_iter([
672            ("aaa", 1, [1, 2]),
673            ("aaa", 2, [2, 3]),
674            ("aaa", 3, [3, 4]),
675            ("aab", 1, [4, 5]),
676            ("aab", 2, [5, 6]),
677            ("aab", 3, [6, 7]),
678            ("abc", 1, [7, 8]),
679            ("abc", 2, [8, 9]),
680            ("abc", 3, [9, 10]),
681        ]);
682
683        let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
684
685        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
686        let res = applier_factory(expr).await;
687        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
688
689        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
690        let res = applier_factory(expr).await;
691        assert_eq!(res, vec![6, 7, 8]);
692
693        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
694        let res = applier_factory(expr).await;
695        assert_eq!(res, vec![3, 4, 5]);
696
697        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
698        let res = applier_factory(expr).await;
699        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
700
701        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
702        let res = applier_factory(expr).await;
703        assert!(res.is_empty());
704
705        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
706        let res = applier_factory(expr).await;
707        assert_eq!(res, vec![0, 1, 2]);
708    }
709}