mito2/sst/index/inverted_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::Arc;
18use std::sync::atomic::AtomicUsize;
19
20use common_telemetry::{debug, warn};
21use datatypes::arrow::record_batch::RecordBatch;
22use datatypes::vectors::Helper;
23use index::inverted_index::create::InvertedIndexCreator;
24use index::inverted_index::create::sort::external_sort::ExternalSorter;
25use index::inverted_index::create::sort_create::SortIndexCreator;
26use index::inverted_index::format::writer::InvertedIndexBlobWriter;
27use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
28use mito_codec::row_converter::SortField;
29use puffin::puffin_manager::{PuffinWriter, PutOptions};
30use snafu::{ResultExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, FileId};
33use tokio::io::duplex;
34use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
35
36use crate::error::{
37    BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
38    PushIndexValueSnafu, Result,
39};
40use crate::read::Batch;
41use crate::sst::index::TYPE_INVERTED_INDEX;
42use crate::sst::index::intermediate::{
43    IntermediateLocation, IntermediateManager, TempFileProvider,
44};
45use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::SstPuffinWriter;
47use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
48
49/// The minimum memory usage threshold for one column.
50const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
51
52/// The buffer size for the pipe used to send index data to the puffin blob.
53const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
54
55/// `InvertedIndexer` creates inverted index for SST files.
56pub struct InvertedIndexer {
57    /// The index creator.
58    index_creator: Box<dyn InvertedIndexCreator>,
59    /// The provider of intermediate files.
60    temp_file_provider: Arc<TempFileProvider>,
61
62    /// Codec for decoding primary keys.
63    codec: IndexValuesCodec,
64    /// Reusable buffer for encoding index values.
65    value_buf: Vec<u8>,
66
67    /// Statistics of index creation.
68    stats: Statistics,
69    /// Whether the index creation is aborted.
70    aborted: bool,
71
72    /// The memory usage of the index creator.
73    memory_usage: Arc<AtomicUsize>,
74
75    /// Ids of indexed columns and their names (`to_string` of the column id).
76    indexed_column_ids: Vec<(ColumnId, String)>,
77
78    /// Region metadata for column lookups.
79    metadata: RegionMetadataRef,
80    /// Cache for mapping indexed column positions to their indices in the RecordBatch.
81    /// Aligns with indexed_column_ids. Initialized lazily when first batch is processed.
82    column_index_cache: Option<Vec<Option<usize>>>,
83}
84
85impl InvertedIndexer {
86    /// Creates a new `InvertedIndexer`.
87    /// Should ensure that the number of tag columns is greater than 0.
88    pub fn new(
89        sst_file_id: FileId,
90        metadata: &RegionMetadataRef,
91        intermediate_manager: IntermediateManager,
92        memory_usage_threshold: Option<usize>,
93        segment_row_count: NonZeroUsize,
94        indexed_column_ids: HashSet<ColumnId>,
95    ) -> Self {
96        let temp_file_provider = Arc::new(TempFileProvider::new(
97            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
98            intermediate_manager,
99        ));
100
101        let memory_usage = Arc::new(AtomicUsize::new(0));
102
103        let sorter = ExternalSorter::factory(
104            temp_file_provider.clone() as _,
105            Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
106            memory_usage.clone(),
107            memory_usage_threshold,
108        );
109        let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
110
111        let codec = IndexValuesCodec::from_tag_columns(
112            metadata.primary_key_encoding,
113            metadata.primary_key_columns(),
114        );
115        let indexed_column_ids = indexed_column_ids
116            .into_iter()
117            .map(|col_id| {
118                let col_id_str = col_id.to_string();
119                (col_id, col_id_str)
120            })
121            .collect();
122        Self {
123            codec,
124            index_creator,
125            temp_file_provider,
126            value_buf: vec![],
127            stats: Statistics::new(TYPE_INVERTED_INDEX),
128            aborted: false,
129            memory_usage,
130            indexed_column_ids,
131            metadata: metadata.clone(),
132            column_index_cache: None,
133        }
134    }
135
136    /// Updates index with a batch of rows.
137    /// Garbage will be cleaned up if failed to update.
138    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
139        ensure!(!self.aborted, OperateAbortedIndexSnafu);
140
141        if batch.is_empty() {
142            return Ok(());
143        }
144
145        if let Err(update_err) = self.do_update(batch).await {
146            // clean up garbage if failed to update
147            if let Err(err) = self.do_cleanup().await {
148                if cfg!(any(test, feature = "test")) {
149                    panic!("Failed to clean up index creator, err: {err}",);
150                } else {
151                    warn!(err; "Failed to clean up index creator");
152                }
153            }
154            return Err(update_err);
155        }
156
157        Ok(())
158    }
159
160    /// Updates the inverted index with the given flat format RecordBatch.
161    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
162        ensure!(!self.aborted, OperateAbortedIndexSnafu);
163
164        if batch.num_rows() == 0 {
165            return Ok(());
166        }
167
168        self.do_update_flat(batch).await
169    }
170
171    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
172        // Initialize column index cache if not already done
173        if self.column_index_cache.is_none() {
174            self.initialize_column_index_cache(batch);
175        }
176
177        let mut guard = self.stats.record_update();
178
179        let n = batch.num_rows();
180        guard.inc_row_count(n);
181
182        let column_indices = self.column_index_cache.as_ref().unwrap();
183
184        for ((col_id, col_id_str), &column_index) in
185            self.indexed_column_ids.iter().zip(column_indices.iter())
186        {
187            if let Some(index) = column_index {
188                let column_array = batch.column(index);
189                // Convert Arrow array to VectorRef using Helper
190                let vector = Helper::try_into_vector(column_array.clone())
191                    .context(crate::error::ConvertVectorSnafu)?;
192                let sort_field = SortField::new(vector.data_type());
193
194                for row in 0..n {
195                    self.value_buf.clear();
196                    let value_ref = vector.get_ref(row);
197
198                    if value_ref.is_null() {
199                        self.index_creator
200                            .push_with_name(col_id_str, None)
201                            .await
202                            .context(PushIndexValueSnafu)?;
203                    } else {
204                        IndexValueCodec::encode_nonnull_value(
205                            value_ref,
206                            &sort_field,
207                            &mut self.value_buf,
208                        )
209                        .context(EncodeSnafu)?;
210                        self.index_creator
211                            .push_with_name(col_id_str, Some(&self.value_buf))
212                            .await
213                            .context(PushIndexValueSnafu)?;
214                    }
215                }
216            } else {
217                debug!(
218                    "Column {} not found in the batch during building inverted index",
219                    col_id
220                );
221            }
222        }
223
224        Ok(())
225    }
226
227    /// Initializes the column index cache by mapping indexed column ids to their positions in the RecordBatch.
228    fn initialize_column_index_cache(&mut self, batch: &RecordBatch) {
229        let mut column_indices = Vec::with_capacity(self.indexed_column_ids.len());
230
231        for (col_id, _) in &self.indexed_column_ids {
232            let column_index = if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
233                let column_name = &column_meta.column_schema.name;
234                batch
235                    .schema()
236                    .column_with_name(column_name)
237                    .map(|(index, _)| index)
238            } else {
239                None
240            };
241            column_indices.push(column_index);
242        }
243
244        self.column_index_cache = Some(column_indices);
245    }
246
247    /// Finishes index creation and cleans up garbage.
248    /// Returns the number of rows and bytes written.
249    pub async fn finish(
250        &mut self,
251        puffin_writer: &mut SstPuffinWriter,
252    ) -> Result<(RowCount, ByteCount)> {
253        ensure!(!self.aborted, OperateAbortedIndexSnafu);
254
255        if self.stats.row_count() == 0 {
256            // no IO is performed, no garbage to clean up, just return
257            return Ok((0, 0));
258        }
259
260        let finish_res = self.do_finish(puffin_writer).await;
261        // clean up garbage no matter finish successfully or not
262        if let Err(err) = self.do_cleanup().await {
263            if cfg!(any(test, feature = "test")) {
264                panic!("Failed to clean up index creator, err: {err}",);
265            } else {
266                warn!(err; "Failed to clean up index creator");
267            }
268        }
269
270        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
271    }
272
273    /// Aborts index creation and clean up garbage.
274    pub async fn abort(&mut self) -> Result<()> {
275        if self.aborted {
276            return Ok(());
277        }
278        self.aborted = true;
279
280        self.do_cleanup().await
281    }
282
283    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
284        let mut guard = self.stats.record_update();
285
286        let n = batch.num_rows();
287        guard.inc_row_count(n);
288
289        for (col_id, col_id_str) in &self.indexed_column_ids {
290            match self.codec.pk_col_info(*col_id) {
291                // pk
292                Some(col_info) => {
293                    let pk_idx = col_info.idx;
294                    let field = &col_info.field;
295                    let value = batch
296                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
297                        .filter(|v| !v.is_null())
298                        .map(|v| {
299                            self.value_buf.clear();
300                            IndexValueCodec::encode_nonnull_value(
301                                v.as_value_ref(),
302                                field,
303                                &mut self.value_buf,
304                            )
305                            .context(EncodeSnafu)?;
306                            Ok(self.value_buf.as_slice())
307                        })
308                        .transpose()?;
309
310                    self.index_creator
311                        .push_with_name_n(col_id_str, value, n)
312                        .await
313                        .context(PushIndexValueSnafu)?;
314                }
315                // fields
316                None => {
317                    let Some(values) = batch.field_col_value(*col_id) else {
318                        debug!(
319                            "Column {} not found in the batch during building inverted index",
320                            col_id
321                        );
322                        continue;
323                    };
324                    let sort_field = SortField::new(values.data.data_type());
325                    for i in 0..n {
326                        self.value_buf.clear();
327                        let value = values.data.get_ref(i);
328                        if value.is_null() {
329                            self.index_creator
330                                .push_with_name(col_id_str, None)
331                                .await
332                                .context(PushIndexValueSnafu)?;
333                        } else {
334                            IndexValueCodec::encode_nonnull_value(
335                                value,
336                                &sort_field,
337                                &mut self.value_buf,
338                            )
339                            .context(EncodeSnafu)?;
340                            self.index_creator
341                                .push_with_name(col_id_str, Some(&self.value_buf))
342                                .await
343                                .context(PushIndexValueSnafu)?;
344                        }
345                    }
346                }
347            }
348        }
349
350        Ok(())
351    }
352
353    /// Data flow of finishing index:
354    ///
355    /// ```text
356    ///                               (In Memory Buffer)
357    ///                                    ┌──────┐
358    ///  ┌─────────────┐                   │ PIPE │
359    ///  │             │ write index data  │      │
360    ///  │ IndexWriter ├──────────────────►│ tx   │
361    ///  │             │                   │      │
362    ///  └─────────────┘                   │      │
363    ///                  ┌─────────────────┤ rx   │
364    ///  ┌─────────────┐ │ read as blob    └──────┘
365    ///  │             │ │
366    ///  │ PuffinWriter├─┤
367    ///  │             │ │ copy to file    ┌──────┐
368    ///  └─────────────┘ └────────────────►│ File │
369    ///                                    └──────┘
370    /// ```
371    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
372        let mut guard = self.stats.record_finish();
373
374        let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
375        let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
376
377        let (index_finish, puffin_add_blob) = futures::join!(
378            // TODO(zhongzc): config bitmap type
379            self.index_creator
380                .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
381            puffin_writer.put_blob(
382                INDEX_BLOB_TYPE,
383                rx.compat(),
384                PutOptions::default(),
385                Default::default(),
386            )
387        );
388
389        match (
390            puffin_add_blob.context(PuffinAddBlobSnafu),
391            index_finish.context(IndexFinishSnafu),
392        ) {
393            (Err(e1), Err(e2)) => BiErrorsSnafu {
394                first: Box::new(e1),
395                second: Box::new(e2),
396            }
397            .fail()?,
398
399            (Ok(_), e @ Err(_)) => e?,
400            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
401            (Ok(written_bytes), Ok(_)) => {
402                guard.inc_byte_count(written_bytes);
403            }
404        }
405
406        Ok(())
407    }
408
409    async fn do_cleanup(&mut self) -> Result<()> {
410        let _guard = self.stats.record_cleanup();
411
412        self.temp_file_provider.cleanup().await
413    }
414
415    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
416        self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
417    }
418
419    pub fn memory_usage(&self) -> usize {
420        self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use std::collections::BTreeSet;
427
428    use api::v1::SemanticType;
429    use datafusion_expr::{Expr as DfExpr, Operator, binary_expr, col, lit};
430    use datatypes::data_type::ConcreteDataType;
431    use datatypes::schema::ColumnSchema;
432    use datatypes::value::ValueRef;
433    use datatypes::vectors::{UInt8Vector, UInt64Vector};
434    use futures::future::BoxFuture;
435    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
436    use object_store::ObjectStore;
437    use object_store::services::Memory;
438    use puffin::puffin_manager::PuffinManager;
439    use puffin::puffin_manager::cache::PuffinMetadataCache;
440    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
441    use store_api::region_request::PathType;
442    use store_api::storage::RegionId;
443
444    use super::*;
445    use crate::access_layer::RegionFilePathFactory;
446    use crate::cache::index::inverted_index::InvertedIndexCache;
447    use crate::metrics::CACHE_BYTES;
448    use crate::read::BatchColumn;
449    use crate::sst::file::RegionFileId;
450    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
451    use crate::sst::index::puffin_manager::PuffinManagerFactory;
452
453    fn mock_object_store() -> ObjectStore {
454        ObjectStore::new(Memory::default()).unwrap().finish()
455    }
456
457    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
458        IntermediateManager::init_fs(path).await.unwrap()
459    }
460
461    fn mock_region_metadata() -> RegionMetadataRef {
462        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
463        builder
464            .push_column_metadata(ColumnMetadata {
465                column_schema: ColumnSchema::new(
466                    "tag_str",
467                    ConcreteDataType::string_datatype(),
468                    false,
469                ),
470                semantic_type: SemanticType::Tag,
471                column_id: 1,
472            })
473            .push_column_metadata(ColumnMetadata {
474                column_schema: ColumnSchema::new(
475                    "tag_i32",
476                    ConcreteDataType::int32_datatype(),
477                    false,
478                ),
479                semantic_type: SemanticType::Tag,
480                column_id: 2,
481            })
482            .push_column_metadata(ColumnMetadata {
483                column_schema: ColumnSchema::new(
484                    "ts",
485                    ConcreteDataType::timestamp_millisecond_datatype(),
486                    false,
487                ),
488                semantic_type: SemanticType::Timestamp,
489                column_id: 3,
490            })
491            .push_column_metadata(ColumnMetadata {
492                column_schema: ColumnSchema::new(
493                    "field_u64",
494                    ConcreteDataType::uint64_datatype(),
495                    false,
496                ),
497                semantic_type: SemanticType::Field,
498                column_id: 4,
499            })
500            .primary_key(vec![1, 2]);
501
502        Arc::new(builder.build().unwrap())
503    }
504
505    fn new_batch(
506        str_tag: impl AsRef<str>,
507        i32_tag: impl Into<i32>,
508        u64_field: impl IntoIterator<Item = u64>,
509    ) -> Batch {
510        let fields = vec![
511            (0, SortField::new(ConcreteDataType::string_datatype())),
512            (1, SortField::new(ConcreteDataType::int32_datatype())),
513        ];
514        let codec = DensePrimaryKeyCodec::with_fields(fields);
515        let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
516        let primary_key = codec.encode(row.into_iter()).unwrap();
517
518        let u64_field = BatchColumn {
519            column_id: 4,
520            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
521        };
522        let num_rows = u64_field.data.len();
523
524        Batch::new(
525            primary_key,
526            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
527                0, num_rows,
528            ))),
529            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
530                0, num_rows,
531            ))),
532            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
533                1, num_rows,
534            ))),
535            vec![u64_field],
536        )
537        .unwrap()
538    }
539
540    async fn build_applier_factory(
541        prefix: &str,
542        rows: BTreeSet<(&'static str, i32, [u64; 2])>,
543    ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
544        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
545        let table_dir = "table0".to_string();
546        let sst_file_id = FileId::random();
547        let object_store = mock_object_store();
548        let region_metadata = mock_region_metadata();
549        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
550        let memory_threshold = None;
551        let segment_row_count = 2;
552        let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
553
554        let mut creator = InvertedIndexer::new(
555            sst_file_id,
556            &region_metadata,
557            intm_mgr,
558            memory_threshold,
559            NonZeroUsize::new(segment_row_count).unwrap(),
560            indexed_column_ids.clone(),
561        );
562
563        for (str_tag, i32_tag, u64_field) in &rows {
564            let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
565            creator.update(&mut batch).await.unwrap();
566        }
567
568        let puffin_manager = factory.build(
569            object_store.clone(),
570            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
571        );
572
573        let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
574        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
575        let (row_count, _) = creator.finish(&mut writer).await.unwrap();
576        assert_eq!(row_count, rows.len() * segment_row_count);
577        writer.finish().await.unwrap();
578
579        move |expr| {
580            let _d = &d;
581            let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
582            let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
583            let applier = InvertedIndexApplierBuilder::new(
584                table_dir.clone(),
585                PathType::Bare,
586                object_store.clone(),
587                &region_metadata,
588                indexed_column_ids.clone(),
589                factory.clone(),
590            )
591            .with_inverted_index_cache(Some(cache))
592            .with_puffin_metadata_cache(Some(puffin_metadata_cache))
593            .build(&[expr])
594            .unwrap()
595            .unwrap();
596            Box::pin(async move {
597                applier
598                    .apply(sst_file_id, None)
599                    .await
600                    .unwrap()
601                    .matched_segment_ids
602                    .iter_ones()
603                    .collect()
604            })
605        }
606    }
607
608    #[tokio::test]
609    async fn test_create_and_query_get_key() {
610        let rows = BTreeSet::from_iter([
611            ("aaa", 1, [1, 2]),
612            ("aaa", 2, [2, 3]),
613            ("aaa", 3, [3, 4]),
614            ("aab", 1, [4, 5]),
615            ("aab", 2, [5, 6]),
616            ("aab", 3, [6, 7]),
617            ("abc", 1, [7, 8]),
618            ("abc", 2, [8, 9]),
619            ("abc", 3, [9, 10]),
620        ]);
621
622        let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
623
624        let expr = col("tag_str").eq(lit("aaa"));
625        let res = applier_factory(expr).await;
626        assert_eq!(res, vec![0, 1, 2]);
627
628        let expr = col("tag_i32").eq(lit(2));
629        let res = applier_factory(expr).await;
630        assert_eq!(res, vec![1, 4, 7]);
631
632        let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
633        let res = applier_factory(expr).await;
634        assert_eq!(res, vec![1]);
635
636        let expr = col("tag_str")
637            .eq(lit("aaa"))
638            .or(col("tag_str").eq(lit("abc")));
639        let res = applier_factory(expr).await;
640        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
641
642        let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
643        let res = applier_factory(expr).await;
644        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
645
646        let expr = col("field_u64").eq(lit(2u64));
647        let res = applier_factory(expr).await;
648        assert_eq!(res, vec![0, 1]);
649    }
650
651    #[tokio::test]
652    async fn test_create_and_query_range() {
653        let rows = BTreeSet::from_iter([
654            ("aaa", 1, [1, 2]),
655            ("aaa", 2, [2, 3]),
656            ("aaa", 3, [3, 4]),
657            ("aab", 1, [4, 5]),
658            ("aab", 2, [5, 6]),
659            ("aab", 3, [6, 7]),
660            ("abc", 1, [7, 8]),
661            ("abc", 2, [8, 9]),
662            ("abc", 3, [9, 10]),
663        ]);
664
665        let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
666
667        let expr = col("tag_str").between(lit("aaa"), lit("aab"));
668        let res = applier_factory(expr).await;
669        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
670
671        let expr = col("tag_i32").between(lit(2), lit(3));
672        let res = applier_factory(expr).await;
673        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
674
675        let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
676        let res = applier_factory(expr).await;
677        assert_eq!(res, vec![0, 1, 2]);
678
679        let expr = col("tag_i32").between(lit(2), lit(2));
680        let res = applier_factory(expr).await;
681        assert_eq!(res, vec![1, 4, 7]);
682
683        let expr = col("field_u64").between(lit(2u64), lit(5u64));
684        let res = applier_factory(expr).await;
685        assert_eq!(res, vec![0, 1, 2, 3, 4]);
686    }
687
688    #[tokio::test]
689    async fn test_create_and_query_comparison() {
690        let rows = BTreeSet::from_iter([
691            ("aaa", 1, [1, 2]),
692            ("aaa", 2, [2, 3]),
693            ("aaa", 3, [3, 4]),
694            ("aab", 1, [4, 5]),
695            ("aab", 2, [5, 6]),
696            ("aab", 3, [6, 7]),
697            ("abc", 1, [7, 8]),
698            ("abc", 2, [8, 9]),
699            ("abc", 3, [9, 10]),
700        ]);
701
702        let applier_factory =
703            build_applier_factory("test_create_and_query_comparison_", rows).await;
704
705        let expr = col("tag_str").lt(lit("aab"));
706        let res = applier_factory(expr).await;
707        assert_eq!(res, vec![0, 1, 2]);
708
709        let expr = col("tag_i32").lt(lit(2));
710        let res = applier_factory(expr).await;
711        assert_eq!(res, vec![0, 3, 6]);
712
713        let expr = col("field_u64").lt(lit(2u64));
714        let res = applier_factory(expr).await;
715        assert_eq!(res, vec![0]);
716
717        let expr = col("tag_str").gt(lit("aab"));
718        let res = applier_factory(expr).await;
719        assert_eq!(res, vec![6, 7, 8]);
720
721        let expr = col("tag_i32").gt(lit(2));
722        let res = applier_factory(expr).await;
723        assert_eq!(res, vec![2, 5, 8]);
724
725        let expr = col("field_u64").gt(lit(8u64));
726        let res = applier_factory(expr).await;
727        assert_eq!(res, vec![7, 8]);
728
729        let expr = col("tag_str").lt_eq(lit("aab"));
730        let res = applier_factory(expr).await;
731        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
732
733        let expr = col("tag_i32").lt_eq(lit(2));
734        let res = applier_factory(expr).await;
735        assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
736
737        let expr = col("field_u64").lt_eq(lit(2u64));
738        let res = applier_factory(expr).await;
739        assert_eq!(res, vec![0, 1]);
740
741        let expr = col("tag_str").gt_eq(lit("aab"));
742        let res = applier_factory(expr).await;
743        assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
744
745        let expr = col("tag_i32").gt_eq(lit(2));
746        let res = applier_factory(expr).await;
747        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
748
749        let expr = col("field_u64").gt_eq(lit(8u64));
750        let res = applier_factory(expr).await;
751        assert_eq!(res, vec![6, 7, 8]);
752
753        let expr = col("tag_str")
754            .gt(lit("aaa"))
755            .and(col("tag_str").lt(lit("abc")));
756        let res = applier_factory(expr).await;
757        assert_eq!(res, vec![3, 4, 5]);
758
759        let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
760        let res = applier_factory(expr).await;
761        assert_eq!(res, vec![1, 4, 7]);
762
763        let expr = col("field_u64")
764            .gt(lit(2u64))
765            .and(col("field_u64").lt(lit(9u64)));
766        let res = applier_factory(expr).await;
767        assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
768    }
769
770    #[tokio::test]
771    async fn test_create_and_query_regex() {
772        let rows = BTreeSet::from_iter([
773            ("aaa", 1, [1, 2]),
774            ("aaa", 2, [2, 3]),
775            ("aaa", 3, [3, 4]),
776            ("aab", 1, [4, 5]),
777            ("aab", 2, [5, 6]),
778            ("aab", 3, [6, 7]),
779            ("abc", 1, [7, 8]),
780            ("abc", 2, [8, 9]),
781            ("abc", 3, [9, 10]),
782        ]);
783
784        let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
785
786        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
787        let res = applier_factory(expr).await;
788        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
789
790        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
791        let res = applier_factory(expr).await;
792        assert_eq!(res, vec![6, 7, 8]);
793
794        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
795        let res = applier_factory(expr).await;
796        assert_eq!(res, vec![3, 4, 5]);
797
798        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
799        let res = applier_factory(expr).await;
800        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
801
802        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
803        let res = applier_factory(expr).await;
804        assert!(res.is_empty());
805
806        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
807        let res = applier_factory(expr).await;
808        assert_eq!(res, vec![0, 1, 2]);
809    }
810}