mito2/sst/index/inverted_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::num::NonZeroUsize;
17use std::sync::Arc;
18use std::sync::atomic::AtomicUsize;
19
20use api::v1::SemanticType;
21use common_telemetry::{debug, warn};
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::vectors::Helper;
24use index::inverted_index::create::InvertedIndexCreator;
25use index::inverted_index::create::sort::external_sort::ExternalSorter;
26use index::inverted_index::create::sort_create::SortIndexCreator;
27use index::inverted_index::format::writer::InvertedIndexBlobWriter;
28use index::target::IndexTarget;
29use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
30use mito_codec::row_converter::{CompositeValues, SortField};
31use puffin::puffin_manager::{PuffinWriter, PutOptions};
32use snafu::{ResultExt, ensure};
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, FileId};
36use tokio::io::duplex;
37use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
38
39use crate::error::{
40    BiErrorsSnafu, EncodeSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
41    PushIndexValueSnafu, Result,
42};
43use crate::read::Batch;
44use crate::sst::index::intermediate::{
45    IntermediateLocation, IntermediateManager, TempFileProvider,
46};
47use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
48use crate::sst::index::puffin_manager::SstPuffinWriter;
49use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
50use crate::sst::index::{TYPE_INVERTED_INDEX, decode_primary_keys_with_counts};
51
52/// The minimum memory usage threshold for one column.
53const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
54
55/// The buffer size for the pipe used to send index data to the puffin blob.
56const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
57
58/// `InvertedIndexer` creates inverted index for SST files.
59pub struct InvertedIndexer {
60    /// The index creator.
61    index_creator: Box<dyn InvertedIndexCreator>,
62    /// The provider of intermediate files.
63    temp_file_provider: Arc<TempFileProvider>,
64
65    /// Codec for decoding primary keys.
66    codec: IndexValuesCodec,
67    /// Reusable buffer for encoding index values.
68    value_buf: Vec<u8>,
69
70    /// Statistics of index creation.
71    stats: Statistics,
72    /// Whether the index creation is aborted.
73    aborted: bool,
74
75    /// The memory usage of the index creator.
76    memory_usage: Arc<AtomicUsize>,
77
78    /// Ids of indexed columns and their encoded target keys.
79    indexed_column_ids: Vec<(ColumnId, String)>,
80
81    /// Region metadata for column lookups.
82    metadata: RegionMetadataRef,
83}
84
85impl InvertedIndexer {
86    /// Creates a new `InvertedIndexer`.
87    /// Should ensure that the number of tag columns is greater than 0.
88    pub fn new(
89        sst_file_id: FileId,
90        metadata: &RegionMetadataRef,
91        intermediate_manager: IntermediateManager,
92        memory_usage_threshold: Option<usize>,
93        segment_row_count: NonZeroUsize,
94        indexed_column_ids: HashSet<ColumnId>,
95    ) -> Self {
96        let temp_file_provider = Arc::new(TempFileProvider::new(
97            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
98            intermediate_manager,
99        ));
100
101        let memory_usage = Arc::new(AtomicUsize::new(0));
102
103        let sorter = ExternalSorter::factory(
104            temp_file_provider.clone() as _,
105            Some(MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN),
106            memory_usage.clone(),
107            memory_usage_threshold,
108        );
109        let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
110
111        let codec = IndexValuesCodec::from_tag_columns(
112            metadata.primary_key_encoding,
113            metadata.primary_key_columns(),
114        );
115        let indexed_column_ids = indexed_column_ids
116            .into_iter()
117            .map(|col_id| {
118                let target_key = format!("{}", IndexTarget::ColumnId(col_id));
119                (col_id, target_key)
120            })
121            .collect();
122        Self {
123            codec,
124            index_creator,
125            temp_file_provider,
126            value_buf: vec![],
127            stats: Statistics::new(TYPE_INVERTED_INDEX),
128            aborted: false,
129            memory_usage,
130            indexed_column_ids,
131            metadata: metadata.clone(),
132        }
133    }
134
135    /// Updates index with a batch of rows.
136    /// Garbage will be cleaned up if failed to update.
137    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
138        ensure!(!self.aborted, OperateAbortedIndexSnafu);
139
140        if batch.is_empty() {
141            return Ok(());
142        }
143
144        if let Err(update_err) = self.do_update(batch).await {
145            // clean up garbage if failed to update
146            if let Err(err) = self.do_cleanup().await {
147                if cfg!(any(test, feature = "test")) {
148                    panic!("Failed to clean up index creator, err: {err}",);
149                } else {
150                    warn!(err; "Failed to clean up index creator");
151                }
152            }
153            return Err(update_err);
154        }
155
156        Ok(())
157    }
158
159    /// Updates the inverted index with the given flat format RecordBatch.
160    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
161        ensure!(!self.aborted, OperateAbortedIndexSnafu);
162
163        if batch.num_rows() == 0 {
164            return Ok(());
165        }
166
167        self.do_update_flat(batch).await
168    }
169
170    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
171        let mut guard = self.stats.record_update();
172
173        guard.inc_row_count(batch.num_rows());
174
175        let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
176        let mut decoded_pks: Option<Vec<(CompositeValues, usize)>> = None;
177
178        for (col_id, target_key) in &self.indexed_column_ids {
179            let Some(column_meta) = self.metadata.column_by_id(*col_id) else {
180                debug!(
181                    "Column {} not found in the metadata during building inverted index",
182                    col_id
183                );
184                continue;
185            };
186            let column_name = &column_meta.column_schema.name;
187            if let Some(column_array) = batch.column_by_name(column_name) {
188                // Convert Arrow array to VectorRef using Helper
189                let vector = Helper::try_into_vector(column_array.clone())
190                    .context(crate::error::ConvertVectorSnafu)?;
191                let sort_field = SortField::new(vector.data_type());
192
193                for row in 0..batch.num_rows() {
194                    self.value_buf.clear();
195                    let value_ref = vector.get_ref(row);
196
197                    if value_ref.is_null() {
198                        self.index_creator
199                            .push_with_name(target_key, None)
200                            .await
201                            .context(PushIndexValueSnafu)?;
202                    } else {
203                        IndexValueCodec::encode_nonnull_value(
204                            value_ref,
205                            &sort_field,
206                            &mut self.value_buf,
207                        )
208                        .context(EncodeSnafu)?;
209                        self.index_creator
210                            .push_with_name(target_key, Some(&self.value_buf))
211                            .await
212                            .context(PushIndexValueSnafu)?;
213                    }
214                }
215            } else if is_sparse && column_meta.semantic_type == SemanticType::Tag {
216                // Column not found in batch, tries to decode from primary keys for sparse encoding.
217                if decoded_pks.is_none() {
218                    decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?);
219                }
220
221                let pk_values_with_counts = decoded_pks.as_ref().unwrap();
222                let Some(col_info) = self.codec.pk_col_info(*col_id) else {
223                    debug!(
224                        "Column {} not found in primary key during building bloom filter index",
225                        column_name
226                    );
227                    continue;
228                };
229                let pk_index = col_info.idx;
230                let field = &col_info.field;
231                for (decoded, count) in pk_values_with_counts {
232                    let value = match decoded {
233                        CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1),
234                        CompositeValues::Sparse(sparse) => sparse.get(col_id),
235                    };
236
237                    let elem = value
238                        .filter(|v| !v.is_null())
239                        .map(|v| {
240                            self.value_buf.clear();
241                            IndexValueCodec::encode_nonnull_value(
242                                v.as_value_ref(),
243                                field,
244                                &mut self.value_buf,
245                            )
246                            .context(EncodeSnafu)?;
247                            Ok(self.value_buf.as_slice())
248                        })
249                        .transpose()?;
250
251                    self.index_creator
252                        .push_with_name_n(target_key, elem, *count)
253                        .await
254                        .context(PushIndexValueSnafu)?;
255                }
256            } else {
257                debug!(
258                    "Column {} not found in the batch during building inverted index",
259                    col_id
260                );
261            }
262        }
263
264        Ok(())
265    }
266
267    /// Finishes index creation and cleans up garbage.
268    /// Returns the number of rows and bytes written.
269    pub async fn finish(
270        &mut self,
271        puffin_writer: &mut SstPuffinWriter,
272    ) -> Result<(RowCount, ByteCount)> {
273        ensure!(!self.aborted, OperateAbortedIndexSnafu);
274
275        if self.stats.row_count() == 0 {
276            // no IO is performed, no garbage to clean up, just return
277            return Ok((0, 0));
278        }
279
280        let finish_res = self.do_finish(puffin_writer).await;
281        // clean up garbage no matter finish successfully or not
282        if let Err(err) = self.do_cleanup().await {
283            if cfg!(any(test, feature = "test")) {
284                panic!("Failed to clean up index creator, err: {err}",);
285            } else {
286                warn!(err; "Failed to clean up index creator");
287            }
288        }
289
290        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
291    }
292
293    /// Aborts index creation and clean up garbage.
294    pub async fn abort(&mut self) -> Result<()> {
295        if self.aborted {
296            return Ok(());
297        }
298        self.aborted = true;
299
300        self.do_cleanup().await
301    }
302
303    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
304        let mut guard = self.stats.record_update();
305
306        let n = batch.num_rows();
307        guard.inc_row_count(n);
308
309        for (col_id, target_key) in &self.indexed_column_ids {
310            match self.codec.pk_col_info(*col_id) {
311                // pk
312                Some(col_info) => {
313                    let pk_idx = col_info.idx;
314                    let field = &col_info.field;
315                    let value = batch
316                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
317                        .filter(|v| !v.is_null())
318                        .map(|v| {
319                            self.value_buf.clear();
320                            IndexValueCodec::encode_nonnull_value(
321                                v.as_value_ref(),
322                                field,
323                                &mut self.value_buf,
324                            )
325                            .context(EncodeSnafu)?;
326                            Ok(self.value_buf.as_slice())
327                        })
328                        .transpose()?;
329
330                    self.index_creator
331                        .push_with_name_n(target_key, value, n)
332                        .await
333                        .context(PushIndexValueSnafu)?;
334                }
335                // fields
336                None => {
337                    let Some(values) = batch.field_col_value(*col_id) else {
338                        debug!(
339                            "Column {} not found in the batch during building inverted index",
340                            col_id
341                        );
342                        continue;
343                    };
344                    let sort_field = SortField::new(values.data.data_type());
345                    for i in 0..n {
346                        self.value_buf.clear();
347                        let value = values.data.get_ref(i);
348                        if value.is_null() {
349                            self.index_creator
350                                .push_with_name(target_key, None)
351                                .await
352                                .context(PushIndexValueSnafu)?;
353                        } else {
354                            IndexValueCodec::encode_nonnull_value(
355                                value,
356                                &sort_field,
357                                &mut self.value_buf,
358                            )
359                            .context(EncodeSnafu)?;
360                            self.index_creator
361                                .push_with_name(target_key, Some(&self.value_buf))
362                                .await
363                                .context(PushIndexValueSnafu)?;
364                        }
365                    }
366                }
367            }
368        }
369
370        Ok(())
371    }
372
373    /// Data flow of finishing index:
374    ///
375    /// ```text
376    ///                               (In Memory Buffer)
377    ///                                    ┌──────┐
378    ///  ┌─────────────┐                   │ PIPE │
379    ///  │             │ write index data  │      │
380    ///  │ IndexWriter ├──────────────────►│ tx   │
381    ///  │             │                   │      │
382    ///  └─────────────┘                   │      │
383    ///                  ┌─────────────────┤ rx   │
384    ///  ┌─────────────┐ │ read as blob    └──────┘
385    ///  │             │ │
386    ///  │ PuffinWriter├─┤
387    ///  │             │ │ copy to file    ┌──────┐
388    ///  └─────────────┘ └────────────────►│ File │
389    ///                                    └──────┘
390    /// ```
391    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
392        let mut guard = self.stats.record_finish();
393
394        let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
395        let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
396
397        let (index_finish, puffin_add_blob) = futures::join!(
398            // TODO(zhongzc): config bitmap type
399            self.index_creator
400                .finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
401            puffin_writer.put_blob(
402                INDEX_BLOB_TYPE,
403                rx.compat(),
404                PutOptions::default(),
405                Default::default(),
406            )
407        );
408
409        match (
410            puffin_add_blob.context(PuffinAddBlobSnafu),
411            index_finish.context(IndexFinishSnafu),
412        ) {
413            (Err(e1), Err(e2)) => BiErrorsSnafu {
414                first: Box::new(e1),
415                second: Box::new(e2),
416            }
417            .fail()?,
418
419            (Ok(_), e @ Err(_)) => e?,
420            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
421            (Ok(written_bytes), Ok(_)) => {
422                guard.inc_byte_count(written_bytes);
423            }
424        }
425
426        Ok(())
427    }
428
429    async fn do_cleanup(&mut self) -> Result<()> {
430        let _guard = self.stats.record_cleanup();
431
432        self.temp_file_provider.cleanup().await
433    }
434
435    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
436        self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
437    }
438
439    pub fn memory_usage(&self) -> usize {
440        self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use std::collections::BTreeSet;
447
448    use api::v1::SemanticType;
449    use datafusion_expr::{Expr as DfExpr, Operator, binary_expr, col, lit};
450    use datatypes::data_type::ConcreteDataType;
451    use datatypes::schema::ColumnSchema;
452    use datatypes::value::ValueRef;
453    use datatypes::vectors::{UInt8Vector, UInt64Vector};
454    use futures::future::BoxFuture;
455    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
456    use object_store::ObjectStore;
457    use object_store::services::Memory;
458    use puffin::puffin_manager::PuffinManager;
459    use puffin::puffin_manager::cache::PuffinMetadataCache;
460    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
461    use store_api::region_request::PathType;
462    use store_api::storage::RegionId;
463
464    use super::*;
465    use crate::access_layer::RegionFilePathFactory;
466    use crate::cache::index::inverted_index::InvertedIndexCache;
467    use crate::metrics::CACHE_BYTES;
468    use crate::read::BatchColumn;
469    use crate::sst::file::RegionFileId;
470    use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
471    use crate::sst::index::puffin_manager::PuffinManagerFactory;
472
473    fn mock_object_store() -> ObjectStore {
474        ObjectStore::new(Memory::default()).unwrap().finish()
475    }
476
477    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
478        IntermediateManager::init_fs(path).await.unwrap()
479    }
480
481    fn mock_region_metadata() -> RegionMetadataRef {
482        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
483        builder
484            .push_column_metadata(ColumnMetadata {
485                column_schema: ColumnSchema::new(
486                    "tag_str",
487                    ConcreteDataType::string_datatype(),
488                    false,
489                ),
490                semantic_type: SemanticType::Tag,
491                column_id: 1,
492            })
493            .push_column_metadata(ColumnMetadata {
494                column_schema: ColumnSchema::new(
495                    "tag_i32",
496                    ConcreteDataType::int32_datatype(),
497                    false,
498                ),
499                semantic_type: SemanticType::Tag,
500                column_id: 2,
501            })
502            .push_column_metadata(ColumnMetadata {
503                column_schema: ColumnSchema::new(
504                    "ts",
505                    ConcreteDataType::timestamp_millisecond_datatype(),
506                    false,
507                ),
508                semantic_type: SemanticType::Timestamp,
509                column_id: 3,
510            })
511            .push_column_metadata(ColumnMetadata {
512                column_schema: ColumnSchema::new(
513                    "field_u64",
514                    ConcreteDataType::uint64_datatype(),
515                    false,
516                ),
517                semantic_type: SemanticType::Field,
518                column_id: 4,
519            })
520            .primary_key(vec![1, 2]);
521
522        Arc::new(builder.build().unwrap())
523    }
524
525    fn new_batch(
526        str_tag: impl AsRef<str>,
527        i32_tag: impl Into<i32>,
528        u64_field: impl IntoIterator<Item = u64>,
529    ) -> Batch {
530        let fields = vec![
531            (0, SortField::new(ConcreteDataType::string_datatype())),
532            (1, SortField::new(ConcreteDataType::int32_datatype())),
533        ];
534        let codec = DensePrimaryKeyCodec::with_fields(fields);
535        let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
536        let primary_key = codec.encode(row.into_iter()).unwrap();
537
538        let u64_field = BatchColumn {
539            column_id: 4,
540            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
541        };
542        let num_rows = u64_field.data.len();
543
544        Batch::new(
545            primary_key,
546            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
547                0, num_rows,
548            ))),
549            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
550                0, num_rows,
551            ))),
552            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
553                1, num_rows,
554            ))),
555            vec![u64_field],
556        )
557        .unwrap()
558    }
559
560    async fn build_applier_factory(
561        prefix: &str,
562        rows: BTreeSet<(&'static str, i32, [u64; 2])>,
563    ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
564        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
565        let table_dir = "table0".to_string();
566        let sst_file_id = FileId::random();
567        let object_store = mock_object_store();
568        let region_metadata = mock_region_metadata();
569        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
570        let memory_threshold = None;
571        let segment_row_count = 2;
572        let indexed_column_ids = HashSet::from_iter([1, 2, 4]);
573
574        let mut creator = InvertedIndexer::new(
575            sst_file_id,
576            &region_metadata,
577            intm_mgr,
578            memory_threshold,
579            NonZeroUsize::new(segment_row_count).unwrap(),
580            indexed_column_ids.clone(),
581        );
582
583        for (str_tag, i32_tag, u64_field) in &rows {
584            let mut batch = new_batch(str_tag, *i32_tag, u64_field.iter().copied());
585            creator.update(&mut batch).await.unwrap();
586        }
587
588        let puffin_manager = factory.build(
589            object_store.clone(),
590            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
591        );
592
593        let sst_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
594        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
595        let (row_count, _) = creator.finish(&mut writer).await.unwrap();
596        assert_eq!(row_count, rows.len() * segment_row_count);
597        writer.finish().await.unwrap();
598
599        move |expr| {
600            let _d = &d;
601            let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
602            let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
603            let applier = InvertedIndexApplierBuilder::new(
604                table_dir.clone(),
605                PathType::Bare,
606                object_store.clone(),
607                &region_metadata,
608                indexed_column_ids.clone(),
609                factory.clone(),
610            )
611            .with_inverted_index_cache(Some(cache))
612            .with_puffin_metadata_cache(Some(puffin_metadata_cache))
613            .build(&[expr])
614            .unwrap()
615            .unwrap();
616            Box::pin(async move {
617                applier
618                    .apply(sst_file_id, None)
619                    .await
620                    .unwrap()
621                    .matched_segment_ids
622                    .iter_ones()
623                    .collect()
624            })
625        }
626    }
627
628    #[tokio::test]
629    async fn test_create_and_query_get_key() {
630        let rows = BTreeSet::from_iter([
631            ("aaa", 1, [1, 2]),
632            ("aaa", 2, [2, 3]),
633            ("aaa", 3, [3, 4]),
634            ("aab", 1, [4, 5]),
635            ("aab", 2, [5, 6]),
636            ("aab", 3, [6, 7]),
637            ("abc", 1, [7, 8]),
638            ("abc", 2, [8, 9]),
639            ("abc", 3, [9, 10]),
640        ]);
641
642        let applier_factory = build_applier_factory("test_create_and_query_get_key_", rows).await;
643
644        let expr = col("tag_str").eq(lit("aaa"));
645        let res = applier_factory(expr).await;
646        assert_eq!(res, vec![0, 1, 2]);
647
648        let expr = col("tag_i32").eq(lit(2));
649        let res = applier_factory(expr).await;
650        assert_eq!(res, vec![1, 4, 7]);
651
652        let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
653        let res = applier_factory(expr).await;
654        assert_eq!(res, vec![1]);
655
656        let expr = col("tag_str")
657            .eq(lit("aaa"))
658            .or(col("tag_str").eq(lit("abc")));
659        let res = applier_factory(expr).await;
660        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
661
662        let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
663        let res = applier_factory(expr).await;
664        assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
665
666        let expr = col("field_u64").eq(lit(2u64));
667        let res = applier_factory(expr).await;
668        assert_eq!(res, vec![0, 1]);
669    }
670
671    #[tokio::test]
672    async fn test_create_and_query_range() {
673        let rows = BTreeSet::from_iter([
674            ("aaa", 1, [1, 2]),
675            ("aaa", 2, [2, 3]),
676            ("aaa", 3, [3, 4]),
677            ("aab", 1, [4, 5]),
678            ("aab", 2, [5, 6]),
679            ("aab", 3, [6, 7]),
680            ("abc", 1, [7, 8]),
681            ("abc", 2, [8, 9]),
682            ("abc", 3, [9, 10]),
683        ]);
684
685        let applier_factory = build_applier_factory("test_create_and_query_range_", rows).await;
686
687        let expr = col("tag_str").between(lit("aaa"), lit("aab"));
688        let res = applier_factory(expr).await;
689        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
690
691        let expr = col("tag_i32").between(lit(2), lit(3));
692        let res = applier_factory(expr).await;
693        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
694
695        let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
696        let res = applier_factory(expr).await;
697        assert_eq!(res, vec![0, 1, 2]);
698
699        let expr = col("tag_i32").between(lit(2), lit(2));
700        let res = applier_factory(expr).await;
701        assert_eq!(res, vec![1, 4, 7]);
702
703        let expr = col("field_u64").between(lit(2u64), lit(5u64));
704        let res = applier_factory(expr).await;
705        assert_eq!(res, vec![0, 1, 2, 3, 4]);
706    }
707
708    #[tokio::test]
709    async fn test_create_and_query_comparison() {
710        let rows = BTreeSet::from_iter([
711            ("aaa", 1, [1, 2]),
712            ("aaa", 2, [2, 3]),
713            ("aaa", 3, [3, 4]),
714            ("aab", 1, [4, 5]),
715            ("aab", 2, [5, 6]),
716            ("aab", 3, [6, 7]),
717            ("abc", 1, [7, 8]),
718            ("abc", 2, [8, 9]),
719            ("abc", 3, [9, 10]),
720        ]);
721
722        let applier_factory =
723            build_applier_factory("test_create_and_query_comparison_", rows).await;
724
725        let expr = col("tag_str").lt(lit("aab"));
726        let res = applier_factory(expr).await;
727        assert_eq!(res, vec![0, 1, 2]);
728
729        let expr = col("tag_i32").lt(lit(2));
730        let res = applier_factory(expr).await;
731        assert_eq!(res, vec![0, 3, 6]);
732
733        let expr = col("field_u64").lt(lit(2u64));
734        let res = applier_factory(expr).await;
735        assert_eq!(res, vec![0]);
736
737        let expr = col("tag_str").gt(lit("aab"));
738        let res = applier_factory(expr).await;
739        assert_eq!(res, vec![6, 7, 8]);
740
741        let expr = col("tag_i32").gt(lit(2));
742        let res = applier_factory(expr).await;
743        assert_eq!(res, vec![2, 5, 8]);
744
745        let expr = col("field_u64").gt(lit(8u64));
746        let res = applier_factory(expr).await;
747        assert_eq!(res, vec![7, 8]);
748
749        let expr = col("tag_str").lt_eq(lit("aab"));
750        let res = applier_factory(expr).await;
751        assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
752
753        let expr = col("tag_i32").lt_eq(lit(2));
754        let res = applier_factory(expr).await;
755        assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
756
757        let expr = col("field_u64").lt_eq(lit(2u64));
758        let res = applier_factory(expr).await;
759        assert_eq!(res, vec![0, 1]);
760
761        let expr = col("tag_str").gt_eq(lit("aab"));
762        let res = applier_factory(expr).await;
763        assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
764
765        let expr = col("tag_i32").gt_eq(lit(2));
766        let res = applier_factory(expr).await;
767        assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
768
769        let expr = col("field_u64").gt_eq(lit(8u64));
770        let res = applier_factory(expr).await;
771        assert_eq!(res, vec![6, 7, 8]);
772
773        let expr = col("tag_str")
774            .gt(lit("aaa"))
775            .and(col("tag_str").lt(lit("abc")));
776        let res = applier_factory(expr).await;
777        assert_eq!(res, vec![3, 4, 5]);
778
779        let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
780        let res = applier_factory(expr).await;
781        assert_eq!(res, vec![1, 4, 7]);
782
783        let expr = col("field_u64")
784            .gt(lit(2u64))
785            .and(col("field_u64").lt(lit(9u64)));
786        let res = applier_factory(expr).await;
787        assert_eq!(res, vec![1, 2, 3, 4, 5, 6, 7]);
788    }
789
790    #[tokio::test]
791    async fn test_create_and_query_regex() {
792        let rows = BTreeSet::from_iter([
793            ("aaa", 1, [1, 2]),
794            ("aaa", 2, [2, 3]),
795            ("aaa", 3, [3, 4]),
796            ("aab", 1, [4, 5]),
797            ("aab", 2, [5, 6]),
798            ("aab", 3, [6, 7]),
799            ("abc", 1, [7, 8]),
800            ("abc", 2, [8, 9]),
801            ("abc", 3, [9, 10]),
802        ]);
803
804        let applier_factory = build_applier_factory("test_create_and_query_regex_", rows).await;
805
806        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
807        let res = applier_factory(expr).await;
808        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
809
810        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
811        let res = applier_factory(expr).await;
812        assert_eq!(res, vec![6, 7, 8]);
813
814        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
815        let res = applier_factory(expr).await;
816        assert_eq!(res, vec![3, 4, 5]);
817
818        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
819        let res = applier_factory(expr).await;
820        assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
821
822        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
823        let res = applier_factory(expr).await;
824        assert!(res.is_empty());
825
826        let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
827        let res = applier_factory(expr).await;
828        assert_eq!(res, vec![0, 1, 2]);
829    }
830}