mito2/sst/index/inverted_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::atomic::AtomicUsize;
18use std::sync::Arc;
19
20use common_telemetry::{debug, warn};
21use datatypes::arrow::record_batch::RecordBatch;
22use datatypes::vectors::Helper;
23use index::inverted_index::create::sort::external_sort::ExternalSorter;
24use index::inverted_index::create::sort_create::SortIndexCreator;
25use index::inverted_index::create::InvertedIndexCreator;
26use index::inverted_index::format::writer::InvertedIndexBlobWriter;
27use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
28use mito_codec::row_converter::SortField;
29use puffin::puffin_manager::{PuffinWriter, PutOptions};
30use snafu::{ensure, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33use tokio::io::duplex;
34use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
35
36use crate::error::{
37    BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
38    PushIndexValueSnafu, Result,
39};
40use crate::read::Batch;
41use crate::sst::file::FileId;
42use crate::sst::index::intermediate::{
43    IntermediateLocation, IntermediateManager, TempFileProvider,
44};
45use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::SstPuffinWriter;
47use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
48use crate::sst::index::TYPE_INVERTED_INDEX;
49
50/// The minimum memory usage threshold for one column.
51const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
52
53/// The buffer size for the pipe used to send index data to the puffin blob.
54const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
55
56/// `InvertedIndexer` creates inverted index for SST files.
57pub struct InvertedIndexer {
58    /// The index creator.
59    index_creator: Box<dyn InvertedIndexCreator>,
60    /// The provider of intermediate files.
61    temp_file_provider: Arc<TempFileProvider>,
62
63    /// Codec for decoding primary keys.
64    codec: IndexValuesCodec,
65    /// Reusable buffer for encoding index values.
66    value_buf: Vec<u8>,
67
68    /// Statistics of index creation.
69    stats: Statistics,
70    /// Whether the index creation is aborted.
71    aborted: bool,
72
73    /// The memory usage of the index creator.
74    memory_usage: Arc<AtomicUsize>,
75
76    /// Ids of indexed columns and their names (`to_string` of the column id).
77    indexed_column_ids: Vec<(ColumnId, String)>,
78
79    /// Region metadata for column lookups.
80    metadata: RegionMetadataRef,
81    /// Cache for mapping indexed column positions to their indices in the RecordBatch.
82    /// Aligns with indexed_column_ids. Initialized lazily when first batch is processed.
83    column_index_cache: Option<Vec<Option<usize>>>,
84}
85
86impl InvertedIndexer {
87    /// Creates a new `InvertedIndexer`.
88    /// Should ensure that the number of tag columns is greater than 0.
89    pub fn new(
90        sst_file_id: FileId,
91        metadata: &RegionMetadataRef,
92        intermediate_manager: IntermediateManager,
93        memory_usage_threshold: Option<usize>,
94        segment_row_count: NonZeroUsize,
95        indexed_column_ids: HashSet<ColumnId>,
96    ) -> Self {
97        let temp_file_provider = Arc::new(TempFileProvider::new(
98            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
99            intermediate_manager,
100        ));
101
102        let memory_usage = Arc::new(AtomicUsize::new(0));
103
104        let sorter = ExternalSorter::factory(
105            temp_file_provider.clone() as _,
106            Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
107            memory_usage.clone(),
108            memory_usage_threshold,
109        );
110        let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
111
112        let codec = IndexValuesCodec::from_tag_columns(
113            metadata.primary_key_encoding,
114            metadata.primary_key_columns(),
115        );
116        let indexed_column_ids = indexed_column_ids
117            .into_iter()
118            .map(|col_id| {
119                let col_id_str = col_id.to_string();
120                (col_id, col_id_str)
121            })
122            .collect();
123        Self {
124            codec,
125            index_creator,
126            temp_file_provider,
127            value_buf: vec![],
128            stats: Statistics::new(TYPE_INVERTED_INDEX),
129            aborted: false,
130            memory_usage,
131            indexed_column_ids,
132            metadata: metadata.clone(),
133            column_index_cache: None,
134        }
135    }
136
137    /// Updates index with a batch of rows.
138    /// Garbage will be cleaned up if failed to update.
139    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
140        ensure!(!self.aborted, OperateAbortedIndexSnafu);
141
142        if batch.is_empty() {
143            return Ok(());
144        }
145
146        if let Err(update_err) = self.do_update(batch).await {
147            // clean up garbage if failed to update
148            if let Err(err) = self.do_cleanup().await {
149                if cfg!(any(test, feature = "test")) {
150                    panic!("Failed to clean up index creator, err: {err}",);
151                } else {
152                    warn!(err; "Failed to clean up index creator");
153                }
154            }
155            return Err(update_err);
156        }
157
158        Ok(())
159    }
160
161    /// Updates the inverted index with the given flat format RecordBatch.
162    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
163        ensure!(!self.aborted, OperateAbortedIndexSnafu);
164
165        if batch.num_rows() == 0 {
166            return Ok(());
167        }
168
169        self.do_update_flat(batch).await
170    }
171
172    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
173        // Initialize column index cache if not already done
174        if self.column_index_cache.is_none() {
175            self.initialize_column_index_cache(batch);
176        }
177
178        let mut guard = self.stats.record_update();
179
180        let n = batch.num_rows();
181        guard.inc_row_count(n);
182
183        let column_indices = self.column_index_cache.as_ref().unwrap();
184
185        for ((col_id, col_id_str), &column_index) in
186            self.indexed_column_ids.iter().zip(column_indices.iter())
187        {
188            if let Some(index) = column_index {
189                let column_array = batch.column(index);
190                // Convert Arrow array to VectorRef using Helper
191                let vector = Helper::try_into_vector(column_array.clone())
192                    .context(crate::error::ConvertVectorSnafu)?;
193                let sort_field = SortField::new(vector.data_type());
194
195                for row in 0..n {
196                    self.value_buf.clear();
197                    let value_ref = vector.get_ref(row);
198
199                    if value_ref.is_null() {
200                        self.index_creator
201                            .push_with_name(col_id_str, None)
202                            .await
203                            .context(PushIndexValueSnafu)?;
204                    } else {
205                        IndexValueCodec::encode_nonnull_value(
206                            value_ref,
207                            &sort_field,
208                            &mut self.value_buf,
209                        )
210                        .context(EncodeSnafu)?;
211                        self.index_creator
212                            .push_with_name(col_id_str, Some(&self.value_buf))
213                            .await
214                            .context(PushIndexValueSnafu)?;
215                    }
216                }
217            } else {
218                debug!(
219                    "Column {} not found in the batch during building inverted index",
220                    col_id
221                );
222            }
223        }
224
225        Ok(())
226    }
227
228    /// Initializes the column index cache by mapping indexed column ids to their positions in the RecordBatch.
229    fn initialize_column_index_cache(&mut self, batch: &RecordBatch) {
230        let mut column_indices = Vec::with_capacity(self.indexed_column_ids.len());
231
232        for (col_id, _) in &self.indexed_column_ids {
233            let column_index = if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
234                let column_name = &column_meta.column_schema.name;
235                batch
236                    .schema()
237                    .column_with_name(column_name)
238                    .map(|(index, _)| index)
239            } else {
240                None
241            };
242            column_indices.push(column_index);
243        }
244
245        self.column_index_cache = Some(column_indices);
246    }
247
248    /// Finishes index creation and cleans up garbage.
249    /// Returns the number of rows and bytes written.
250    pub async fn finish(
251        &mut self,
252        puffin_writer: &mut SstPuffinWriter,
253    ) -> Result<(RowCount, ByteCount)> {
254        ensure!(!self.aborted, OperateAbortedIndexSnafu);
255
256        if self.stats.row_count() == 0 {
257            // no IO is performed, no garbage to clean up, just return
258            return Ok((0, 0));
259        }
260
261        let finish_res = self.do_finish(puffin_writer).await;
262        // clean up garbage no matter finish successfully or not
263        if let Err(err) = self.do_cleanup().await {
264            if cfg!(any(test, feature = "test")) {
265                panic!("Failed to clean up index creator, err: {err}",);
266            } else {
267                warn!(err; "Failed to clean up index creator");
268            }
269        }
270
271        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
272    }
273
274    /// Aborts index creation and clean up garbage.
275    pub async fn abort(&mut self) -> Result<()> {
276        if self.aborted {
277            return Ok(());
278        }
279        self.aborted = true;
280
281        self.do_cleanup().await
282    }
283
284    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
285        let mut guard = self.stats.record_update();
286
287        let n = batch.num_rows();
288        guard.inc_row_count(n);
289
290        for (col_id, col_id_str) in &self.indexed_column_ids {
291            match self.codec.pk_col_info(*col_id) {
292                // pk
293                Some(col_info) => {
294                    let pk_idx = col_info.idx;
295                    let field = &col_info.field;
296                    let value = batch
297                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
298                        .filter(|v| !v.is_null())
299                        .map(|v| {
300                            self.value_buf.clear();
301                            IndexValueCodec::encode_nonnull_value(
302                                v.as_value_ref(),
303                                field,
304                                &mut self.value_buf,
305                            )
306                            .context(EncodeSnafu)?;
307                            Ok(self.value_buf.as_slice())
308                        })
309                        .transpose()?;
310
311                    self.index_creator
312                        .push_with_name_n(col_id_str, value, n)
313                        .await
314                        .context(PushIndexValueSnafu)?;
315                }
316                // fields
317                None => {
318                    let Some(values) = batch.field_col_value(*col_id) else {
319                        debug!(
320                            "Column {} not found in the batch during building inverted index",
321                            col_id
322                        );
323                        continue;
324                    };
325                    let sort_field = SortField::new(values.data.data_type());
326                    for i in 0..n {
327                        self.value_buf.clear();
328                        let value = values.data.get_ref(i);
329                        if value.is_null() {
330                            self.index_creator
331                                .push_with_name(col_id_str, None)
332                                .await
333                                .context(PushIndexValueSnafu)?;
334                        } else {
335                            IndexValueCodec::encode_nonnull_value(
336                                value,
337                                &sort_field,
338                                &mut self.value_buf,
339                            )
340                            .context(EncodeSnafu)?;
341                            self.index_creator
342                                .push_with_name(col_id_str, Some(&self.value_buf))
343                                .await
344                                .context(PushIndexValueSnafu)?;
345                        }
346                    }
347                }
348            }
349        }
350
351        Ok(())
352    }
353
354    /// Data flow of finishing index:
355    ///
356    /// ```text
357    ///                               (In Memory Buffer)
358    ///                                    ┌──────┐
359    ///  ┌─────────────┐                   │ PIPE │
360    ///  │             │ write index data  │      │
361    ///  │ IndexWriter ├──────────────────►│ tx   │
362    ///  │             │                   │      │
363    ///  └─────────────┘                   │      │
364    ///                  ┌─────────────────┤ rx   │
365    ///  ┌─────────────┐ │ read as blob    └──────┘
366    ///  │             │ │
367    ///  │ PuffinWriter├─┤
368    ///  │             │ │ copy to file    ┌──────┐
369    ///  └─────────────┘ └────────────────►│ File │
370    ///                                    └──────┘
371    /// ```
372    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
373        let mut guard = self.stats.record_finish();
374
375        let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
376        let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
377
378        let (index_finish, puffin_add_blob) = futures::join!(
379            // TODO(zhongzc): config bitmap type
380            self.index_creator
381                .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
382            puffin_writer.put_blob(
383                INDEX_BLOB_TYPE,
384                rx.compat(),
385                PutOptions::default(),
386                Default::default(),
387            )
388        );
389
390        match (
391            puffin_add_blob.context(PuffinAddBlobSnafu),
392            index_finish.context(IndexFinishSnafu),
393        ) {
394            (Err(e1), Err(e2)) => BiErrorsSnafu {
395                first: Box::new(e1),
396                second: Box::new(e2),
397            }
398            .fail()?,
399
400            (Ok(_), e @ Err(_)) => e?,
401            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
402            (Ok(written_bytes), Ok(_)) => {
403                guard.inc_byte_count(written_bytes);
404            }
405        }
406
407        Ok(())
408    }
409
410    async fn do_cleanup(&mut self) -> Result<()> {
411        let _guard = self.stats.record_cleanup();
412
413        self.temp_file_provider.cleanup().await
414    }
415
416    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
417        self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
418    }
419
420    pub fn memory_usage(&self) -> usize {
421        self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use std::collections::BTreeSet;
428
429    use api::v1::SemanticType;
430    use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
431    use datatypes::data_type::ConcreteDataType;
432    use datatypes::schema::ColumnSchema;
433    use datatypes::value::ValueRef;
434    use datatypes::vectors::{UInt64Vector, UInt8Vector};
435    use futures::future::BoxFuture;
436    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
437    use object_store::services::Memory;
438    use object_store::ObjectStore;
439    use puffin::puffin_manager::cache::PuffinMetadataCache;
440    use puffin::puffin_manager::PuffinManager;
441    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
442    use store_api::region_request::PathType;
443    use store_api::storage::RegionId;
444
445    use super::*;
446    use crate::access_layer::RegionFilePathFactory;
447    use crate::cache::index::inverted_index::InvertedIndexCache;
448    use crate::metrics::CACHE_BYTES;
449    use crate::read::BatchColumn;
450    use crate::sst::file::RegionFileId;
451    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
452    use crate::sst::index::puffin_manager::PuffinManagerFactory;
453
454    fn mock_object_store() -> ObjectStore {
455        ObjectStore::new(Memory::default()).unwrap().finish()
456    }
457
458    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
459        IntermediateManager::init_fs(path).await.unwrap()
460    }
461
462    fn mock_region_metadata() -> RegionMetadataRef {
463        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
464        builder
465            .push_column_metadata(ColumnMetadata {
466                column_schema: ColumnSchema::new(
467                    "tag_str",
468                    ConcreteDataType::string_datatype(),
469                    false,
470                ),
471                semantic_type: SemanticType::Tag,
472                column_id: 1,
473            })
474            .push_column_metadata(ColumnMetadata {
475                column_schema: ColumnSchema::new(
476                    "tag_i32",
477                    ConcreteDataType::int32_datatype(),
478                    false,
479                ),
480                semantic_type: SemanticType::Tag,
481                column_id: 2,
482            })
483            .push_column_metadata(ColumnMetadata {
484                column_schema: ColumnSchema::new(
485                    "ts",
486                    ConcreteDataType::timestamp_millisecond_datatype(),
487                    false,
488                ),
489                semantic_type: SemanticType::Timestamp,
490                column_id: 3,
491            })
492            .push_column_metadata(ColumnMetadata {
493                column_schema: ColumnSchema::new(
494                    "field_u64",
495                    ConcreteDataType::uint64_datatype(),
496                    false,
497                ),
498                semantic_type: SemanticType::Field,
499                column_id: 4,
500            })
501            .primary_key(vec![1, 2]);
502
503        Arc::new(builder.build().unwrap())
504    }
505
506    fn new_batch(
507        str_tag: impl AsRef<str>,
508        i32_tag: impl Into<i32>,
509        u64_field: impl IntoIterator<Item = u64>,
510    ) -> Batch {
511        let fields = vec![
512            (0, SortField::new(ConcreteDataType::string_datatype())),
513            (1, SortField::new(ConcreteDataType::int32_datatype())),
514        ];
515        let codec = DensePrimaryKeyCodec::with_fields(fields);
516        let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
517        let primary_key = codec.encode(row.into_iter()).unwrap();
518
519        let u64_field = BatchColumn {
520            column_id: 4,
521            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
522        };
523        let num_rows = u64_field.data.len();
524
525        Batch::new(
526            primary_key,
527            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
528                0, num_rows,
529            ))),
530            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
531                0, num_rows,
532            ))),
533            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
534                1, num_rows,
535            ))),
536            vec![u64_field],
537        )
538        .unwrap()
539    }
540
541    async fn build_applier_factory(
542        prefix: &str,
543        rows: BTreeSet<(&'static str, i32, [u64; 2])>,
544    ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
545        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
546        let table_dir = "table0".to_string();
547        let sst_file_id = FileId::random();
548        let object_store = mock_object_store();
549        let region_metadata = mock_region_metadata();
550        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
551        let memory_threshold = None;
552        let segment_row_count = 2;
553        let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
554
555        let mut creator = InvertedIndexer::new(
556            sst_file_id,
557            &region_metadata,
558            intm_mgr,
559            memory_threshold,
560            NonZeroUsize::new(segment_row_count).unwrap(),
561            indexed_column_ids.clone(),
562        );
563
564        for (str_tag, i32_tag, u64_field) in &rows {
565            let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
566            creator.update(&mut batch).await.unwrap();
567        }
568
569        let puffin_manager = factory.build(
570            object_store.clone(),
571            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
572        );
573
574        let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
575        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
576        let (row_count, _) = creator.finish(&mut writer).await.unwrap();
577        assert_eq!(row_count, rows.len() * segment_row_count);
578        writer.finish().await.unwrap();
579
580        move |expr| {
581            let _d = &d;
582            let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
583            let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
584            let applier = InvertedIndexApplierBuilder::new(
585                table_dir.clone(),
586                PathType::Bare,
587                object_store.clone(),
588                &region_metadata,
589                indexed_column_ids.clone(),
590                factory.clone(),
591            )
592            .with_inverted_index_cache(Some(cache))
593            .with_puffin_metadata_cache(Some(puffin_metadata_cache))
594            .build(&[expr])
595            .unwrap()
596            .unwrap();
597            Box::pin(async move {
598                applier
599                    .apply(sst_file_id, None)
600                    .await
601                    .unwrap()
602                    .matched_segment_ids
603                    .iter_ones()
604                    .collect()
605            })
606        }
607    }
608
609    #[tokio::test]
610    async fn test_create_and_query_get_key() {
611        let rows = BTreeSet::from_iter([
612            ("aaa", 1, [1, 2]),
613            ("aaa", 2, [2, 3]),
614            ("aaa", 3, [3, 4]),
615            ("aab", 1, [4, 5]),
616            ("aab", 2, [5, 6]),
617            ("aab", 3, [6, 7]),
618            ("abc", 1, [7, 8]),
619            ("abc", 2, [8, 9]),
620            ("abc", 3, [9, 10]),
621        ]);
622
623        let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
624
625        let expr = col("tag_str").eq(lit("aaa"));
626        let res = applier_factory(expr).await;
627        assert_eq!(res, vec![0, 1, 2]);
628
629        let expr = col("tag_i32").eq(lit(2));
630        let res = applier_factory(expr).await;
631        assert_eq!(res, vec![1, 4, 7]);
632
633        let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
634        let res = applier_factory(expr).await;
635        assert_eq!(res, vec![1]);
636
637        let expr = col("tag_str")
638            .eq(lit("aaa"))
639            .or(col("tag_str").eq(lit("abc")));
640        let res = applier_factory(expr).await;
641        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
642
643        let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
644        let res = applier_factory(expr).await;
645        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
646
647        let expr = col("field_u64").eq(lit(2u64));
648        let res = applier_factory(expr).await;
649        assert_eq!(res, vec![0, 1]);
650    }
651
652    #[tokio::test]
653    async fn test_create_and_query_range() {
654        let rows = BTreeSet::from_iter([
655            ("aaa", 1, [1, 2]),
656            ("aaa", 2, [2, 3]),
657            ("aaa", 3, [3, 4]),
658            ("aab", 1, [4, 5]),
659            ("aab", 2, [5, 6]),
660            ("aab", 3, [6, 7]),
661            ("abc", 1, [7, 8]),
662            ("abc", 2, [8, 9]),
663            ("abc", 3, [9, 10]),
664        ]);
665
666        let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
667
668        let expr = col("tag_str").between(lit("aaa"), lit("aab"));
669        let res = applier_factory(expr).await;
670        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
671
672        let expr = col("tag_i32").between(lit(2), lit(3));
673        let res = applier_factory(expr).await;
674        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
675
676        let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
677        let res = applier_factory(expr).await;
678        assert_eq!(res, vec![0, 1, 2]);
679
680        let expr = col("tag_i32").between(lit(2), lit(2));
681        let res = applier_factory(expr).await;
682        assert_eq!(res, vec![1, 4, 7]);
683
684        let expr = col("field_u64").between(lit(2u64), lit(5u64));
685        let res = applier_factory(expr).await;
686        assert_eq!(res, vec![0, 1, 2, 3, 4]);
687    }
688
689    #[tokio::test]
690    async fn test_create_and_query_comparison() {
691        let rows = BTreeSet::from_iter([
692            ("aaa", 1, [1, 2]),
693            ("aaa", 2, [2, 3]),
694            ("aaa", 3, [3, 4]),
695            ("aab", 1, [4, 5]),
696            ("aab", 2, [5, 6]),
697            ("aab", 3, [6, 7]),
698            ("abc", 1, [7, 8]),
699            ("abc", 2, [8, 9]),
700            ("abc", 3, [9, 10]),
701        ]);
702
703        let applier_factory =
704            build_applier_factory("test_create_and_query_comparison_", rows).await;
705
706        let expr = col("tag_str").lt(lit("aab"));
707        let res = applier_factory(expr).await;
708        assert_eq!(res, vec![0, 1, 2]);
709
710        let expr = col("tag_i32").lt(lit(2));
711        let res = applier_factory(expr).await;
712        assert_eq!(res, vec![0, 3, 6]);
713
714        let expr = col("field_u64").lt(lit(2u64));
715        let res = applier_factory(expr).await;
716        assert_eq!(res, vec![0]);
717
718        let expr = col("tag_str").gt(lit("aab"));
719        let res = applier_factory(expr).await;
720        assert_eq!(res, vec![6, 7, 8]);
721
722        let expr = col("tag_i32").gt(lit(2));
723        let res = applier_factory(expr).await;
724        assert_eq!(res, vec![2, 5, 8]);
725
726        let expr = col("field_u64").gt(lit(8u64));
727        let res = applier_factory(expr).await;
728        assert_eq!(res, vec![7, 8]);
729
730        let expr = col("tag_str").lt_eq(lit("aab"));
731        let res = applier_factory(expr).await;
732        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
733
734        let expr = col("tag_i32").lt_eq(lit(2));
735        let res = applier_factory(expr).await;
736        assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
737
738        let expr = col("field_u64").lt_eq(lit(2u64));
739        let res = applier_factory(expr).await;
740        assert_eq!(res, vec![0, 1]);
741
742        let expr = col("tag_str").gt_eq(lit("aab"));
743        let res = applier_factory(expr).await;
744        assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
745
746        let expr = col("tag_i32").gt_eq(lit(2));
747        let res = applier_factory(expr).await;
748        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
749
750        let expr = col("field_u64").gt_eq(lit(8u64));
751        let res = applier_factory(expr).await;
752        assert_eq!(res, vec![6, 7, 8]);
753
754        let expr = col("tag_str")
755            .gt(lit("aaa"))
756            .and(col("tag_str").lt(lit("abc")));
757        let res = applier_factory(expr).await;
758        assert_eq!(res, vec![3, 4, 5]);
759
760        let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
761        let res = applier_factory(expr).await;
762        assert_eq!(res, vec![1, 4, 7]);
763
764        let expr = col("field_u64")
765            .gt(lit(2u64))
766            .and(col("field_u64").lt(lit(9u64)));
767        let res = applier_factory(expr).await;
768        assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
769    }
770
771    #[tokio::test]
772    async fn test_create_and_query_regex() {
773        let rows = BTreeSet::from_iter([
774            ("aaa", 1, [1, 2]),
775            ("aaa", 2, [2, 3]),
776            ("aaa", 3, [3, 4]),
777            ("aab", 1, [4, 5]),
778            ("aab", 2, [5, 6]),
779            ("aab", 3, [6, 7]),
780            ("abc", 1, [7, 8]),
781            ("abc", 2, [8, 9]),
782            ("abc", 3, [9, 10]),
783        ]);
784
785        let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
786
787        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
788        let res = applier_factory(expr).await;
789        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
790
791        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
792        let res = applier_factory(expr).await;
793        assert_eq!(res, vec![6, 7, 8]);
794
795        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
796        let res = applier_factory(expr).await;
797        assert_eq!(res, vec![3, 4, 5]);
798
799        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
800        let res = applier_factory(expr).await;
801        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
802
803        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
804        let res = applier_factory(expr).await;
805        assert!(res.is_empty());
806
807        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
808        let res = applier_factory(expr).await;
809        assert_eq!(res, vec![0, 1, 2]);
810    }
811}