mito2/sst/index/inverted_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use index::inverted_index::create::sort::external_sort::ExternalSorter;
22use index::inverted_index::create::sort_create::SortIndexCreator;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::format::writer::InvertedIndexBlobWriter;
25use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
26use mito_codec::row_converter::SortField;
27use puffin::puffin_manager::{PuffinWriter, PutOptions};
28use snafu::{ensure, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::ColumnId;
31use tokio::io::duplex;
32use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
33
34use crate::error::{
35    BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
36    PushIndexValueSnafu, Result,
37};
38use crate::read::Batch;
39use crate::sst::file::FileId;
40use crate::sst::index::intermediate::{
41    IntermediateLocation, IntermediateManager, TempFileProvider,
42};
43use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::TYPE_INVERTED_INDEX;
47
48/// The minimum memory usage threshold for one column.
49const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
50
51/// The buffer size for the pipe used to send index data to the puffin blob.
52const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
53
54/// `InvertedIndexer` creates inverted index for SST files.
55pub struct InvertedIndexer {
56    /// The index creator.
57    index_creator: Box<dyn InvertedIndexCreator>,
58    /// The provider of intermediate files.
59    temp_file_provider: Arc<TempFileProvider>,
60
61    /// Codec for decoding primary keys.
62    codec: IndexValuesCodec,
63    /// Reusable buffer for encoding index values.
64    value_buf: Vec<u8>,
65
66    /// Statistics of index creation.
67    stats: Statistics,
68    /// Whether the index creation is aborted.
69    aborted: bool,
70
71    /// The memory usage of the index creator.
72    memory_usage: Arc<AtomicUsize>,
73
74    /// Ids of indexed columns and their names (`to_string` of the column id).
75    indexed_column_ids: Vec<(ColumnId, String)>,
76}
77
78impl InvertedIndexer {
79    /// Creates a new `InvertedIndexer`.
80    /// Should ensure that the number of tag columns is greater than 0.
81    pub fn new(
82        sst_file_id: FileId,
83        metadata: &RegionMetadataRef,
84        intermediate_manager: IntermediateManager,
85        memory_usage_threshold: Option<usize>,
86        segment_row_count: NonZeroUsize,
87        indexed_column_ids: HashSet<ColumnId>,
88    ) -> Self {
89        let temp_file_provider = Arc::new(TempFileProvider::new(
90            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
91            intermediate_manager,
92        ));
93
94        let memory_usage = Arc::new(AtomicUsize::new(0));
95
96        let sorter = ExternalSorter::factory(
97            temp_file_provider.clone() as _,
98            Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
99            memory_usage.clone(),
100            memory_usage_threshold,
101        );
102        let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
103
104        let codec = IndexValuesCodec::from_tag_columns(
105            metadata.primary_key_encoding,
106            metadata.primary_key_columns(),
107        );
108        let indexed_column_ids = indexed_column_ids
109            .into_iter()
110            .map(|col_id| {
111                let col_id_str = col_id.to_string();
112                (col_id, col_id_str)
113            })
114            .collect();
115        Self {
116            codec,
117            index_creator,
118            temp_file_provider,
119            value_buf: vec![],
120            stats: Statistics::new(TYPE_INVERTED_INDEX),
121            aborted: false,
122            memory_usage,
123            indexed_column_ids,
124        }
125    }
126
127    /// Updates index with a batch of rows.
128    /// Garbage will be cleaned up if failed to update.
129    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
130        ensure!(!self.aborted, OperateAbortedIndexSnafu);
131
132        if batch.is_empty() {
133            return Ok(());
134        }
135
136        if let Err(update_err) = self.do_update(batch).await {
137            // clean up garbage if failed to update
138            if let Err(err) = self.do_cleanup().await {
139                if cfg!(any(test, feature = "test")) {
140                    panic!("Failed to clean up index creator, err: {err}",);
141                } else {
142                    warn!(err; "Failed to clean up index creator");
143                }
144            }
145            return Err(update_err);
146        }
147
148        Ok(())
149    }
150
151    /// Finishes index creation and cleans up garbage.
152    /// Returns the number of rows and bytes written.
153    pub async fn finish(
154        &mut self,
155        puffin_writer: &mut SstPuffinWriter,
156    ) -> Result<(RowCount, ByteCount)> {
157        ensure!(!self.aborted, OperateAbortedIndexSnafu);
158
159        if self.stats.row_count() == 0 {
160            // no IO is performed, no garbage to clean up, just return
161            return Ok((0, 0));
162        }
163
164        let finish_res = self.do_finish(puffin_writer).await;
165        // clean up garbage no matter finish successfully or not
166        if let Err(err) = self.do_cleanup().await {
167            if cfg!(any(test, feature = "test")) {
168                panic!("Failed to clean up index creator, err: {err}",);
169            } else {
170                warn!(err; "Failed to clean up index creator");
171            }
172        }
173
174        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
175    }
176
177    /// Aborts index creation and clean up garbage.
178    pub async fn abort(&mut self) -> Result<()> {
179        if self.aborted {
180            return Ok(());
181        }
182        self.aborted = true;
183
184        self.do_cleanup().await
185    }
186
187    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
188        let mut guard = self.stats.record_update();
189
190        let n = batch.num_rows();
191        guard.inc_row_count(n);
192
193        for (col_id, col_id_str) in &self.indexed_column_ids {
194            match self.codec.pk_col_info(*col_id) {
195                // pk
196                Some(col_info) => {
197                    let pk_idx = col_info.idx;
198                    let field = &col_info.field;
199                    let value = batch
200                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
201                        .filter(|v| !v.is_null())
202                        .map(|v| {
203                            self.value_buf.clear();
204                            IndexValueCodec::encode_nonnull_value(
205                                v.as_value_ref(),
206                                field,
207                                &mut self.value_buf,
208                            )
209                            .context(EncodeSnafu)?;
210                            Ok(self.value_buf.as_slice())
211                        })
212                        .transpose()?;
213
214                    self.index_creator
215                        .push_with_name_n(col_id_str, value, n)
216                        .await
217                        .context(PushIndexValueSnafu)?;
218                }
219                // fields
220                None => {
221                    let Some(values) = batch.field_col_value(*col_id) else {
222                        debug!(
223                            "Column {} not found in the batch during building inverted index",
224                            col_id
225                        );
226                        continue;
227                    };
228                    let sort_field = SortField::new(values.data.data_type());
229                    for i in 0..n {
230                        self.value_buf.clear();
231                        let value = values.data.get_ref(i);
232                        if value.is_null() {
233                            self.index_creator
234                                .push_with_name(col_id_str, None)
235                                .await
236                                .context(PushIndexValueSnafu)?;
237                        } else {
238                            IndexValueCodec::encode_nonnull_value(
239                                value,
240                                &sort_field,
241                                &mut self.value_buf,
242                            )
243                            .context(EncodeSnafu)?;
244                            self.index_creator
245                                .push_with_name(col_id_str, Some(&self.value_buf))
246                                .await
247                                .context(PushIndexValueSnafu)?;
248                        }
249                    }
250                }
251            }
252        }
253
254        Ok(())
255    }
256
257    /// Data flow of finishing index:
258    ///
259    /// ```text
260    ///                               (In Memory Buffer)
261    ///                                    ┌──────┐
262    ///  ┌─────────────┐                   │ PIPE │
263    ///  │             │ write index data  │      │
264    ///  │ IndexWriter ├──────────────────►│ tx   │
265    ///  │             │                   │      │
266    ///  └─────────────┘                   │      │
267    ///                  ┌─────────────────┤ rx   │
268    ///  ┌─────────────┐ │ read as blob    └──────┘
269    ///  │             │ │
270    ///  │ PuffinWriter├─┤
271    ///  │             │ │ copy to file    ┌──────┐
272    ///  └─────────────┘ └────────────────►│ File │
273    ///                                    └──────┘
274    /// ```
275    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
276        let mut guard = self.stats.record_finish();
277
278        let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
279        let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
280
281        let (index_finish, puffin_add_blob) = futures::join!(
282            // TODO(zhongzc): config bitmap type
283            self.index_creator
284                .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
285            puffin_writer.put_blob(
286                INDEX_BLOB_TYPE,
287                rx.compat(),
288                PutOptions::default(),
289                Default::default(),
290            )
291        );
292
293        match (
294            puffin_add_blob.context(PuffinAddBlobSnafu),
295            index_finish.context(IndexFinishSnafu),
296        ) {
297            (Err(e1), Err(e2)) => BiErrorsSnafu {
298                first: Box::new(e1),
299                second: Box::new(e2),
300            }
301            .fail()?,
302
303            (Ok(_), e @ Err(_)) => e?,
304            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
305            (Ok(written_bytes), Ok(_)) => {
306                guard.inc_byte_count(written_bytes);
307            }
308        }
309
310        Ok(())
311    }
312
313    async fn do_cleanup(&mut self) -> Result<()> {
314        let _guard = self.stats.record_cleanup();
315
316        self.temp_file_provider.cleanup().await
317    }
318
319    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
320        self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
321    }
322
323    pub fn memory_usage(&self) -> usize {
324        self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use std::collections::BTreeSet;
331
332    use api::v1::SemanticType;
333    use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
334    use datatypes::data_type::ConcreteDataType;
335    use datatypes::schema::ColumnSchema;
336    use datatypes::value::ValueRef;
337    use datatypes::vectors::{UInt64Vector, UInt8Vector};
338    use futures::future::BoxFuture;
339    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
340    use object_store::services::Memory;
341    use object_store::ObjectStore;
342    use puffin::puffin_manager::cache::PuffinMetadataCache;
343    use puffin::puffin_manager::PuffinManager;
344    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
345    use store_api::region_request::PathType;
346    use store_api::storage::RegionId;
347
348    use super::*;
349    use crate::access_layer::RegionFilePathFactory;
350    use crate::cache::index::inverted_index::InvertedIndexCache;
351    use crate::metrics::CACHE_BYTES;
352    use crate::read::BatchColumn;
353    use crate::sst::file::RegionFileId;
354    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
355    use crate::sst::index::puffin_manager::PuffinManagerFactory;
356
357    fn mock_object_store() -> ObjectStore {
358        ObjectStore::new(Memory::default()).unwrap().finish()
359    }
360
361    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
362        IntermediateManager::init_fs(path).await.unwrap()
363    }
364
365    fn mock_region_metadata() -> RegionMetadataRef {
366        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
367        builder
368            .push_column_metadata(ColumnMetadata {
369                column_schema: ColumnSchema::new(
370                    "tag_str",
371                    ConcreteDataType::string_datatype(),
372                    false,
373                ),
374                semantic_type: SemanticType::Tag,
375                column_id: 1,
376            })
377            .push_column_metadata(ColumnMetadata {
378                column_schema: ColumnSchema::new(
379                    "tag_i32",
380                    ConcreteDataType::int32_datatype(),
381                    false,
382                ),
383                semantic_type: SemanticType::Tag,
384                column_id: 2,
385            })
386            .push_column_metadata(ColumnMetadata {
387                column_schema: ColumnSchema::new(
388                    "ts",
389                    ConcreteDataType::timestamp_millisecond_datatype(),
390                    false,
391                ),
392                semantic_type: SemanticType::Timestamp,
393                column_id: 3,
394            })
395            .push_column_metadata(ColumnMetadata {
396                column_schema: ColumnSchema::new(
397                    "field_u64",
398                    ConcreteDataType::uint64_datatype(),
399                    false,
400                ),
401                semantic_type: SemanticType::Field,
402                column_id: 4,
403            })
404            .primary_key(vec![1, 2]);
405
406        Arc::new(builder.build().unwrap())
407    }
408
409    fn new_batch(
410        str_tag: impl AsRef<str>,
411        i32_tag: impl Into<i32>,
412        u64_field: impl IntoIterator<Item = u64>,
413    ) -> Batch {
414        let fields = vec![
415            (0, SortField::new(ConcreteDataType::string_datatype())),
416            (1, SortField::new(ConcreteDataType::int32_datatype())),
417        ];
418        let codec = DensePrimaryKeyCodec::with_fields(fields);
419        let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
420        let primary_key = codec.encode(row.into_iter()).unwrap();
421
422        let u64_field = BatchColumn {
423            column_id: 4,
424            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
425        };
426        let num_rows = u64_field.data.len();
427
428        Batch::new(
429            primary_key,
430            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
431                0, num_rows,
432            ))),
433            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
434                0, num_rows,
435            ))),
436            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
437                1, num_rows,
438            ))),
439            vec![u64_field],
440        )
441        .unwrap()
442    }
443
444    async fn build_applier_factory(
445        prefix: &str,
446        rows: BTreeSet<(&'static str, i32, [u64; 2])>,
447    ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
448        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
449        let table_dir = "table0".to_string();
450        let sst_file_id = FileId::random();
451        let object_store = mock_object_store();
452        let region_metadata = mock_region_metadata();
453        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
454        let memory_threshold = None;
455        let segment_row_count = 2;
456        let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
457
458        let mut creator = InvertedIndexer::new(
459            sst_file_id,
460            &region_metadata,
461            intm_mgr,
462            memory_threshold,
463            NonZeroUsize::new(segment_row_count).unwrap(),
464            indexed_column_ids.clone(),
465        );
466
467        for (str_tag, i32_tag, u64_field) in &rows {
468            let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
469            creator.update(&mut batch).await.unwrap();
470        }
471
472        let puffin_manager = factory.build(
473            object_store.clone(),
474            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
475        );
476
477        let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
478        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
479        let (row_count, _) = creator.finish(&mut writer).await.unwrap();
480        assert_eq!(row_count, rows.len() * segment_row_count);
481        writer.finish().await.unwrap();
482
483        move |expr| {
484            let _d = &d;
485            let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
486            let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
487            let applier = InvertedIndexApplierBuilder::new(
488                table_dir.clone(),
489                PathType::Bare,
490                object_store.clone(),
491                &region_metadata,
492                indexed_column_ids.clone(),
493                factory.clone(),
494            )
495            .with_inverted_index_cache(Some(cache))
496            .with_puffin_metadata_cache(Some(puffin_metadata_cache))
497            .build(&[expr])
498            .unwrap()
499            .unwrap();
500            Box::pin(async move {
501                applier
502                    .apply(sst_file_id, None)
503                    .await
504                    .unwrap()
505                    .matched_segment_ids
506                    .iter_ones()
507                    .collect()
508            })
509        }
510    }
511
512    #[tokio::test]
513    async fn test_create_and_query_get_key() {
514        let rows = BTreeSet::from_iter([
515            ("aaa", 1, [1, 2]),
516            ("aaa", 2, [2, 3]),
517            ("aaa", 3, [3, 4]),
518            ("aab", 1, [4, 5]),
519            ("aab", 2, [5, 6]),
520            ("aab", 3, [6, 7]),
521            ("abc", 1, [7, 8]),
522            ("abc", 2, [8, 9]),
523            ("abc", 3, [9, 10]),
524        ]);
525
526        let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
527
528        let expr = col("tag_str").eq(lit("aaa"));
529        let res = applier_factory(expr).await;
530        assert_eq!(res, vec![0, 1, 2]);
531
532        let expr = col("tag_i32").eq(lit(2));
533        let res = applier_factory(expr).await;
534        assert_eq!(res, vec![1, 4, 7]);
535
536        let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
537        let res = applier_factory(expr).await;
538        assert_eq!(res, vec![1]);
539
540        let expr = col("tag_str")
541            .eq(lit("aaa"))
542            .or(col("tag_str").eq(lit("abc")));
543        let res = applier_factory(expr).await;
544        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
545
546        let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
547        let res = applier_factory(expr).await;
548        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
549
550        let expr = col("field_u64").eq(lit(2u64));
551        let res = applier_factory(expr).await;
552        assert_eq!(res, vec![0, 1]);
553    }
554
555    #[tokio::test]
556    async fn test_create_and_query_range() {
557        let rows = BTreeSet::from_iter([
558            ("aaa", 1, [1, 2]),
559            ("aaa", 2, [2, 3]),
560            ("aaa", 3, [3, 4]),
561            ("aab", 1, [4, 5]),
562            ("aab", 2, [5, 6]),
563            ("aab", 3, [6, 7]),
564            ("abc", 1, [7, 8]),
565            ("abc", 2, [8, 9]),
566            ("abc", 3, [9, 10]),
567        ]);
568
569        let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
570
571        let expr = col("tag_str").between(lit("aaa"), lit("aab"));
572        let res = applier_factory(expr).await;
573        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
574
575        let expr = col("tag_i32").between(lit(2), lit(3));
576        let res = applier_factory(expr).await;
577        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
578
579        let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
580        let res = applier_factory(expr).await;
581        assert_eq!(res, vec![0, 1, 2]);
582
583        let expr = col("tag_i32").between(lit(2), lit(2));
584        let res = applier_factory(expr).await;
585        assert_eq!(res, vec![1, 4, 7]);
586
587        let expr = col("field_u64").between(lit(2u64), lit(5u64));
588        let res = applier_factory(expr).await;
589        assert_eq!(res, vec![0, 1, 2, 3, 4]);
590    }
591
592    #[tokio::test]
593    async fn test_create_and_query_comparison() {
594        let rows = BTreeSet::from_iter([
595            ("aaa", 1, [1, 2]),
596            ("aaa", 2, [2, 3]),
597            ("aaa", 3, [3, 4]),
598            ("aab", 1, [4, 5]),
599            ("aab", 2, [5, 6]),
600            ("aab", 3, [6, 7]),
601            ("abc", 1, [7, 8]),
602            ("abc", 2, [8, 9]),
603            ("abc", 3, [9, 10]),
604        ]);
605
606        let applier_factory =
607            build_applier_factory("test_create_and_query_comparison_", rows).await;
608
609        let expr = col("tag_str").lt(lit("aab"));
610        let res = applier_factory(expr).await;
611        assert_eq!(res, vec![0, 1, 2]);
612
613        let expr = col("tag_i32").lt(lit(2));
614        let res = applier_factory(expr).await;
615        assert_eq!(res, vec![0, 3, 6]);
616
617        let expr = col("field_u64").lt(lit(2u64));
618        let res = applier_factory(expr).await;
619        assert_eq!(res, vec![0]);
620
621        let expr = col("tag_str").gt(lit("aab"));
622        let res = applier_factory(expr).await;
623        assert_eq!(res, vec![6, 7, 8]);
624
625        let expr = col("tag_i32").gt(lit(2));
626        let res = applier_factory(expr).await;
627        assert_eq!(res, vec![2, 5, 8]);
628
629        let expr = col("field_u64").gt(lit(8u64));
630        let res = applier_factory(expr).await;
631        assert_eq!(res, vec![7, 8]);
632
633        let expr = col("tag_str").lt_eq(lit("aab"));
634        let res = applier_factory(expr).await;
635        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
636
637        let expr = col("tag_i32").lt_eq(lit(2));
638        let res = applier_factory(expr).await;
639        assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
640
641        let expr = col("field_u64").lt_eq(lit(2u64));
642        let res = applier_factory(expr).await;
643        assert_eq!(res, vec![0, 1]);
644
645        let expr = col("tag_str").gt_eq(lit("aab"));
646        let res = applier_factory(expr).await;
647        assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
648
649        let expr = col("tag_i32").gt_eq(lit(2));
650        let res = applier_factory(expr).await;
651        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
652
653        let expr = col("field_u64").gt_eq(lit(8u64));
654        let res = applier_factory(expr).await;
655        assert_eq!(res, vec![6, 7, 8]);
656
657        let expr = col("tag_str")
658            .gt(lit("aaa"))
659            .and(col("tag_str").lt(lit("abc")));
660        let res = applier_factory(expr).await;
661        assert_eq!(res, vec![3, 4, 5]);
662
663        let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
664        let res = applier_factory(expr).await;
665        assert_eq!(res, vec![1, 4, 7]);
666
667        let expr = col("field_u64")
668            .gt(lit(2u64))
669            .and(col("field_u64").lt(lit(9u64)));
670        let res = applier_factory(expr).await;
671        assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
672    }
673
674    #[tokio::test]
675    async fn test_create_and_query_regex() {
676        let rows = BTreeSet::from_iter([
677            ("aaa", 1, [1, 2]),
678            ("aaa", 2, [2, 3]),
679            ("aaa", 3, [3, 4]),
680            ("aab", 1, [4, 5]),
681            ("aab", 2, [5, 6]),
682            ("aab", 3, [6, 7]),
683            ("abc", 1, [7, 8]),
684            ("abc", 2, [8, 9]),
685            ("abc", 3, [9, 10]),
686        ]);
687
688        let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
689
690        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
691        let res = applier_factory(expr).await;
692        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
693
694        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
695        let res = applier_factory(expr).await;
696        assert_eq!(res, vec![6, 7, 8]);
697
698        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
699        let res = applier_factory(expr).await;
700        assert_eq!(res, vec![3, 4, 5]);
701
702        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
703        let res = applier_factory(expr).await;
704        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
705
706        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
707        let res = applier_factory(expr).await;
708        assert!(res.is_empty());
709
710        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
711        let res = applier_factory(expr).await;
712        assert_eq!(res, vec![0, 1, 2]);
713    }
714}