mito2/sst/index/fulltext_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::warn;
20use datatypes::arrow::array::{Array, StringArray};
21use datatypes::arrow::datatypes::DataType;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
24use index::fulltext_index::create::{
25    BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
26};
27use index::fulltext_index::{Analyzer, Config};
28use puffin::blob_metadata::CompressionCodec;
29use puffin::puffin_manager::PutOptions;
30use snafu::{ensure, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
33
34use crate::error::{
35    CastVectorSnafu, ComputeArrowSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu,
36    FulltextFinishSnafu, FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu,
37    Result,
38};
39use crate::read::Batch;
40use crate::sst::file::FileId;
41use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
42use crate::sst::index::intermediate::{
43    IntermediateLocation, IntermediateManager, TempFileProvider,
44};
45use crate::sst::index::puffin_manager::SstPuffinWriter;
46use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
47use crate::sst::index::TYPE_FULLTEXT_INDEX;
48
49/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files.
50pub struct FulltextIndexer {
51    /// Creators for each column.
52    creators: HashMap<ColumnId, SingleCreator>,
53    /// Whether the index creation was aborted.
54    aborted: bool,
55    /// Statistics of index creation.
56    stats: Statistics,
57}
58
59impl FulltextIndexer {
60    /// Creates a new `FulltextIndexer`.
61    pub async fn new(
62        region_id: &RegionId,
63        sst_file_id: &FileId,
64        intermediate_manager: &IntermediateManager,
65        metadata: &RegionMetadataRef,
66        compress: bool,
67        mem_limit: usize,
68    ) -> Result<Option<Self>> {
69        let mut creators = HashMap::new();
70
71        for column in &metadata.column_metadatas {
72            let options = column
73                .column_schema
74                .fulltext_options()
75                .context(IndexOptionsSnafu {
76                    column_name: &column.column_schema.name,
77                })?;
78
79            // Relax the type constraint here as many types can be casted to string.
80
81            let options = match options {
82                Some(options) if options.enable => options,
83                _ => continue,
84            };
85
86            let column_id = column.column_id;
87            let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id);
88
89            let config = Config {
90                analyzer: match options.analyzer {
91                    FulltextAnalyzer::English => Analyzer::English,
92                    FulltextAnalyzer::Chinese => Analyzer::Chinese,
93                },
94                case_sensitive: options.case_sensitive,
95            };
96
97            let inner = match options.backend {
98                FulltextBackend::Tantivy => {
99                    let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
100                        .await
101                        .context(CreateFulltextCreatorSnafu)?;
102                    AltFulltextCreator::Tantivy(creator)
103                }
104                FulltextBackend::Bloom => {
105                    let temp_file_provider = Arc::new(TempFileProvider::new(
106                        IntermediateLocation::new(&metadata.region_id, sst_file_id),
107                        intermediate_manager.clone(),
108                    ));
109                    let global_memory_usage = Arc::new(AtomicUsize::new(0));
110                    let creator = BloomFilterFulltextIndexCreator::new(
111                        config,
112                        options.granularity as _,
113                        options.false_positive_rate(),
114                        temp_file_provider,
115                        global_memory_usage,
116                        Some(mem_limit),
117                    );
118                    AltFulltextCreator::Bloom(creator)
119                }
120            };
121
122            creators.insert(
123                column_id,
124                SingleCreator {
125                    column_id,
126                    column_name: column.column_schema.name.clone(),
127                    inner,
128                    compress,
129                },
130            );
131        }
132
133        Ok((!creators.is_empty()).then(move || Self {
134            creators,
135            aborted: false,
136            stats: Statistics::new(TYPE_FULLTEXT_INDEX),
137        }))
138    }
139
140    /// Updates the index with the given batch.
141    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
142        ensure!(!self.aborted, OperateAbortedIndexSnafu);
143
144        if let Err(update_err) = self.do_update(batch).await {
145            if let Err(err) = self.do_abort().await {
146                if cfg!(any(test, feature = "test")) {
147                    panic!("Failed to abort index creator, err: {err}");
148                } else {
149                    warn!(err; "Failed to abort index creator");
150                }
151            }
152            return Err(update_err);
153        }
154
155        Ok(())
156    }
157
158    /// Updates the fulltext index with the given flat format RecordBatch.
159    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
160        ensure!(!self.aborted, OperateAbortedIndexSnafu);
161
162        if batch.num_rows() == 0 {
163            return Ok(());
164        }
165
166        if let Err(update_err) = self.do_update_flat(batch).await {
167            if let Err(err) = self.do_abort().await {
168                if cfg!(any(test, feature = "test")) {
169                    panic!("Failed to abort index creator, err: {err}");
170                } else {
171                    warn!(err; "Failed to abort index creator");
172                }
173            }
174            return Err(update_err);
175        }
176
177        Ok(())
178    }
179
180    /// Finalizes the index creation.
181    pub async fn finish(
182        &mut self,
183        puffin_writer: &mut SstPuffinWriter,
184    ) -> Result<(RowCount, ByteCount)> {
185        ensure!(!self.aborted, OperateAbortedIndexSnafu);
186
187        match self.do_finish(puffin_writer).await {
188            Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())),
189            Err(finish_err) => {
190                if let Err(err) = self.do_abort().await {
191                    if cfg!(any(test, feature = "test")) {
192                        panic!("Failed to abort index creator, err: {err}");
193                    } else {
194                        warn!(err; "Failed to abort index creator");
195                    }
196                }
197                Err(finish_err)
198            }
199        }
200    }
201
202    /// Aborts the index creation.
203    pub async fn abort(&mut self) -> Result<()> {
204        if self.aborted {
205            return Ok(());
206        }
207
208        self.do_abort().await
209    }
210
211    /// Returns the memory usage of the index creator.
212    pub fn memory_usage(&self) -> usize {
213        self.creators.values().map(|c| c.inner.memory_usage()).sum()
214    }
215
216    /// Returns IDs of columns that the creator is responsible for.
217    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
218        self.creators.keys().copied()
219    }
220}
221
222impl FulltextIndexer {
223    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
224        let mut guard = self.stats.record_update();
225        guard.inc_row_count(batch.num_rows());
226
227        for creator in self.creators.values_mut() {
228            creator.update(batch).await?;
229        }
230
231        Ok(())
232    }
233
234    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
235        let mut guard = self.stats.record_update();
236        guard.inc_row_count(batch.num_rows());
237
238        for creator in self.creators.values_mut() {
239            creator.update_flat(batch).await?;
240        }
241
242        Ok(())
243    }
244
245    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
246        let mut guard = self.stats.record_finish();
247
248        let mut written_bytes = 0;
249        for creator in self.creators.values_mut() {
250            written_bytes += creator.finish(puffin_writer).await?;
251        }
252
253        guard.inc_byte_count(written_bytes);
254        Ok(())
255    }
256
257    async fn do_abort(&mut self) -> Result<()> {
258        let _guard = self.stats.record_cleanup();
259
260        self.aborted = true;
261
262        for (_, mut creator) in self.creators.drain() {
263            creator.abort().await?;
264        }
265
266        Ok(())
267    }
268}
269
270/// `SingleCreator` is a creator for a single column.
271struct SingleCreator {
272    /// Column ID.
273    column_id: ColumnId,
274    /// Column name.
275    column_name: String,
276    /// Inner creator.
277    inner: AltFulltextCreator,
278    /// Whether the index should be compressed.
279    compress: bool,
280}
281
282impl SingleCreator {
283    async fn update(&mut self, batch: &mut Batch) -> Result<()> {
284        let text_column = batch
285            .fields()
286            .iter()
287            .find(|c| c.column_id == self.column_id);
288        match text_column {
289            Some(column) => {
290                let data = column
291                    .data
292                    .cast(&ConcreteDataType::string_datatype())
293                    .context(CastVectorSnafu {
294                        from: column.data.data_type(),
295                        to: ConcreteDataType::string_datatype(),
296                    })?;
297
298                for i in 0..batch.num_rows() {
299                    let data = data.get_ref(i);
300                    let text = data
301                        .as_string()
302                        .context(DataTypeMismatchSnafu)?
303                        .unwrap_or_default();
304                    self.inner.push_text(text).await?;
305                }
306            }
307            _ => {
308                // If the column is not found in the batch, push empty text.
309                // Ensure that the number of texts pushed is the same as the number of rows in the SST,
310                // so that the texts are aligned with the row ids.
311                for _ in 0..batch.num_rows() {
312                    self.inner.push_text("").await?;
313                }
314            }
315        }
316
317        Ok(())
318    }
319
320    async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
321        // Find the column in the RecordBatch by name
322        if let Some(column_array) = batch.column_by_name(&self.column_name) {
323            // Convert Arrow array to string array.
324            // TODO(yingwen): Use Utf8View later if possible.
325            let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8)
326                .context(ComputeArrowSnafu)?;
327            let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
328            for text_opt in string_array.iter() {
329                let text = text_opt.unwrap_or_default();
330                self.inner.push_text(text).await?;
331            }
332        } else {
333            // If the column is not found in the batch, push empty text.
334            // Ensure that the number of texts pushed is the same as the number of rows in the SST,
335            // so that the texts are aligned with the row ids.
336            for _ in 0..batch.num_rows() {
337                self.inner.push_text("").await?;
338            }
339        }
340
341        Ok(())
342    }
343
344    async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
345        let options = PutOptions {
346            compression: self.compress.then_some(CompressionCodec::Zstd),
347        };
348        self.inner
349            .finish(puffin_writer, &self.column_id, options)
350            .await
351    }
352
353    async fn abort(&mut self) -> Result<()> {
354        self.inner.abort(&self.column_id).await;
355        Ok(())
356    }
357}
358
359#[allow(dead_code, clippy::large_enum_variant)]
360/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter.
361enum AltFulltextCreator {
362    Tantivy(TantivyFulltextIndexCreator),
363    Bloom(BloomFilterFulltextIndexCreator),
364}
365
366impl AltFulltextCreator {
367    async fn push_text(&mut self, text: &str) -> Result<()> {
368        match self {
369            Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
370            Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
371        }
372    }
373
374    fn memory_usage(&self) -> usize {
375        match self {
376            Self::Tantivy(creator) => creator.memory_usage(),
377            Self::Bloom(creator) => creator.memory_usage(),
378        }
379    }
380
381    async fn finish(
382        &mut self,
383        puffin_writer: &mut SstPuffinWriter,
384        column_id: &ColumnId,
385        put_options: PutOptions,
386    ) -> Result<ByteCount> {
387        match self {
388            Self::Tantivy(creator) => {
389                let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id);
390                creator
391                    .finish(puffin_writer, &key, put_options)
392                    .await
393                    .context(FulltextFinishSnafu)
394            }
395            Self::Bloom(creator) => {
396                let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id);
397                creator
398                    .finish(puffin_writer, &key, put_options)
399                    .await
400                    .context(FulltextFinishSnafu)
401            }
402        }
403    }
404
405    async fn abort(&mut self, column_id: &ColumnId) {
406        match self {
407            Self::Tantivy(creator) => {
408                if let Err(err) = creator.abort().await {
409                    warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id);
410                }
411            }
412            Self::Bloom(creator) => {
413                if let Err(err) = creator.abort().await {
414                    warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id);
415                }
416            }
417        }
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use std::collections::{BTreeMap, BTreeSet};
424    use std::sync::Arc;
425
426    use api::v1::SemanticType;
427    use common_base::BitVec;
428    use datatypes::data_type::DataType;
429    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
430    use datatypes::vectors::{UInt64Vector, UInt8Vector};
431    use futures::future::BoxFuture;
432    use futures::FutureExt;
433    use index::fulltext_index::search::RowId;
434    use object_store::services::Memory;
435    use object_store::ObjectStore;
436    use puffin::puffin_manager::{PuffinManager, PuffinWriter};
437    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
438    use store_api::region_request::PathType;
439    use store_api::storage::{ConcreteDataType, RegionId};
440
441    use super::*;
442    use crate::access_layer::RegionFilePathFactory;
443    use crate::read::{Batch, BatchColumn};
444    use crate::sst::file::{FileId, RegionFileId};
445    use crate::sst::index::fulltext_index::applier::builder::{
446        FulltextQuery, FulltextRequest, FulltextTerm,
447    };
448    use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
449    use crate::sst::index::puffin_manager::PuffinManagerFactory;
450
451    fn mock_object_store() -> ObjectStore {
452        ObjectStore::new(Memory::default()).unwrap().finish()
453    }
454
455    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
456        IntermediateManager::init_fs(path).await.unwrap()
457    }
458
459    fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef {
460        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
461        builder
462            .push_column_metadata(ColumnMetadata {
463                column_schema: ColumnSchema::new(
464                    "text_english_case_sensitive",
465                    ConcreteDataType::string_datatype(),
466                    true,
467                )
468                .with_fulltext_options(FulltextOptions::new_unchecked(
469                    true,
470                    FulltextAnalyzer::English,
471                    true,
472                    backend.clone(),
473                    1,
474                    0.01,
475                ))
476                .unwrap(),
477                semantic_type: SemanticType::Field,
478                column_id: 1,
479            })
480            .push_column_metadata(ColumnMetadata {
481                column_schema: ColumnSchema::new(
482                    "text_english_case_insensitive",
483                    ConcreteDataType::string_datatype(),
484                    true,
485                )
486                .with_fulltext_options(FulltextOptions::new_unchecked(
487                    true,
488                    FulltextAnalyzer::English,
489                    false,
490                    backend.clone(),
491                    1,
492                    0.01,
493                ))
494                .unwrap(),
495                semantic_type: SemanticType::Field,
496                column_id: 2,
497            })
498            .push_column_metadata(ColumnMetadata {
499                column_schema: ColumnSchema::new(
500                    "text_chinese",
501                    ConcreteDataType::string_datatype(),
502                    true,
503                )
504                .with_fulltext_options(FulltextOptions::new_unchecked(
505                    true,
506                    FulltextAnalyzer::Chinese,
507                    false,
508                    backend.clone(),
509                    1,
510                    0.01,
511                ))
512                .unwrap(),
513                semantic_type: SemanticType::Field,
514                column_id: 3,
515            })
516            .push_column_metadata(ColumnMetadata {
517                column_schema: ColumnSchema::new(
518                    "ts",
519                    ConcreteDataType::timestamp_millisecond_datatype(),
520                    false,
521                ),
522                semantic_type: SemanticType::Timestamp,
523                column_id: 4,
524            });
525
526        Arc::new(builder.build().unwrap())
527    }
528
529    fn new_batch(
530        rows: &[(
531            Option<&str>, // text_english_case_sensitive
532            Option<&str>, // text_english_case_insensitive
533            Option<&str>, // text_chinese
534        )],
535    ) -> Batch {
536        let mut vec_english_sensitive =
537            ConcreteDataType::string_datatype().create_mutable_vector(0);
538        let mut vec_english_insensitive =
539            ConcreteDataType::string_datatype().create_mutable_vector(0);
540        let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0);
541
542        for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
543            match text_english_case_sensitive {
544                Some(s) => vec_english_sensitive.push_value_ref((*s).into()),
545                None => vec_english_sensitive.push_null(),
546            }
547            match text_english_case_insensitive {
548                Some(s) => vec_english_insensitive.push_value_ref((*s).into()),
549                None => vec_english_insensitive.push_null(),
550            }
551            match text_chinese {
552                Some(s) => vec_chinese.push_value_ref((*s).into()),
553                None => vec_chinese.push_null(),
554            }
555        }
556
557        let num_rows = vec_english_sensitive.len();
558        Batch::new(
559            vec![],
560            Arc::new(UInt64Vector::from_iter_values(
561                (0..num_rows).map(|n| n as u64),
562            )),
563            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
564                0, num_rows,
565            ))),
566            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
567                1, num_rows,
568            ))),
569            vec![
570                BatchColumn {
571                    column_id: 1,
572                    data: vec_english_sensitive.to_vector(),
573                },
574                BatchColumn {
575                    column_id: 2,
576                    data: vec_english_insensitive.to_vector(),
577                },
578                BatchColumn {
579                    column_id: 3,
580                    data: vec_chinese.to_vector(),
581                },
582            ],
583        )
584        .unwrap()
585    }
586
587    /// Applier factory that can handle both queries and terms.
588    ///
589    /// It builds a fulltext index with the given data rows, and returns a function
590    /// that can handle both queries and terms in a single request.
591    ///
592    /// The function takes two parameters:
593    /// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries
594    /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
595    async fn build_fulltext_applier_factory(
596        prefix: &str,
597        backend: FulltextBackend,
598        rows: &[(
599            Option<&str>, // text_english_case_sensitive
600            Option<&str>, // text_english_case_insensitive
601            Option<&str>, // text_chinese
602        )],
603    ) -> impl Fn(
604        Vec<(ColumnId, &str)>,
605        Vec<(ColumnId, Vec<(bool, &str)>)>,
606        Option<BitVec>,
607    ) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
608        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
609        let table_dir = "table0".to_string();
610        let sst_file_id = FileId::random();
611        let object_store = mock_object_store();
612        let region_metadata = mock_region_metadata(backend.clone());
613        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
614
615        let mut indexer = FulltextIndexer::new(
616            &region_metadata.region_id,
617            &sst_file_id,
618            &intm_mgr,
619            &region_metadata,
620            true,
621            1024,
622        )
623        .await
624        .unwrap()
625        .unwrap();
626
627        let mut batch = new_batch(rows);
628        indexer.update(&mut batch).await.unwrap();
629
630        let puffin_manager = factory.build(
631            object_store.clone(),
632            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
633        );
634        let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
635        let mut writer = puffin_manager.writer(&region_file_id).await.unwrap();
636        let _ = indexer.finish(&mut writer).await.unwrap();
637        writer.finish().await.unwrap();
638
639        move |queries: Vec<(ColumnId, &str)>,
640              terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>,
641              coarse_mask: Option<BitVec>| {
642            let _d = &d;
643            let table_dir = table_dir.clone();
644            let object_store = object_store.clone();
645            let factory = factory.clone();
646
647            let mut requests: BTreeMap<ColumnId, FulltextRequest> = BTreeMap::new();
648
649            // Add queries
650            for (column_id, query) in queries {
651                requests
652                    .entry(column_id)
653                    .or_default()
654                    .queries
655                    .push(FulltextQuery(query.to_string()));
656            }
657
658            // Add terms
659            for (column_id, terms) in terms_requests {
660                let fulltext_terms = terms
661                    .into_iter()
662                    .map(|(col_lowered, term)| FulltextTerm {
663                        col_lowered,
664                        term: term.to_string(),
665                    })
666                    .collect::<Vec<_>>();
667
668                requests
669                    .entry(column_id)
670                    .or_default()
671                    .terms
672                    .extend(fulltext_terms);
673            }
674
675            let applier = FulltextIndexApplier::new(
676                table_dir,
677                PathType::Bare,
678                object_store,
679                requests,
680                factory,
681            );
682
683            let backend = backend.clone();
684            async move {
685                match backend {
686                    FulltextBackend::Tantivy => {
687                        applier.apply_fine(region_file_id, None).await.unwrap()
688                    }
689                    FulltextBackend::Bloom => {
690                        let coarse_mask = coarse_mask.unwrap_or_default();
691                        let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
692                        // row group id == row id
693                        let resp = applier
694                            .apply_coarse(region_file_id, None, row_groups)
695                            .await
696                            .unwrap();
697                        resp.map(|r| {
698                            r.into_iter()
699                                .filter(|(_, ranges)| !ranges.is_empty())
700                                .map(|(row_group_id, _)| row_group_id as RowId)
701                                .collect()
702                        })
703                    }
704                }
705            }
706            .boxed()
707        }
708    }
709
710    fn rows(row_ids: impl IntoIterator<Item = RowId>) -> BTreeSet<RowId> {
711        row_ids.into_iter().collect()
712    }
713
714    #[tokio::test]
715    async fn test_fulltext_index_basic_case_sensitive_tantivy() {
716        let applier_factory = build_fulltext_applier_factory(
717            "test_fulltext_index_basic_case_sensitive_tantivy_",
718            FulltextBackend::Tantivy,
719            &[
720                (Some("hello"), None, None),
721                (Some("world"), None, None),
722                (None, None, None),
723                (Some("Hello, World"), None, None),
724            ],
725        )
726        .await;
727
728        let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await;
729        assert_eq!(row_ids, Some(rows([0])));
730
731        let row_ids = applier_factory(vec![(1, "world")], vec![], None).await;
732        assert_eq!(row_ids, Some(rows([1])));
733
734        let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await;
735        assert_eq!(row_ids, Some(rows([3])));
736
737        let row_ids = applier_factory(vec![(1, "World")], vec![], None).await;
738        assert_eq!(row_ids, Some(rows([3])));
739
740        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await;
741        assert_eq!(row_ids, Some(rows([0])));
742
743        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await;
744        assert_eq!(row_ids, None);
745
746        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await;
747        assert_eq!(row_ids, Some(rows([1])));
748
749        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await;
750        assert_eq!(row_ids, None);
751
752        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await;
753        assert_eq!(row_ids, Some(rows([3])));
754
755        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await;
756        assert_eq!(row_ids, None);
757
758        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await;
759        assert_eq!(row_ids, Some(rows([3])));
760
761        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await;
762        assert_eq!(row_ids, None);
763    }
764
765    #[tokio::test]
766    async fn test_fulltext_index_basic_case_sensitive_bloom() {
767        let applier_factory = build_fulltext_applier_factory(
768            "test_fulltext_index_basic_case_sensitive_bloom_",
769            FulltextBackend::Bloom,
770            &[
771                (Some("hello"), None, None),
772                (Some("world"), None, None),
773                (None, None, None),
774                (Some("Hello, World"), None, None),
775            ],
776        )
777        .await;
778
779        let row_ids = applier_factory(
780            vec![],
781            vec![(1, vec![(false, "hello")])],
782            Some(BitVec::from_slice(&[0b1111])),
783        )
784        .await;
785        assert_eq!(row_ids, Some(rows([0])));
786
787        let row_ids = applier_factory(
788            vec![],
789            vec![(1, vec![(false, "hello")])],
790            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
791        )
792        .await;
793        assert_eq!(row_ids, Some(rows([])));
794
795        let row_ids = applier_factory(
796            vec![],
797            vec![(1, vec![(true, "hello")])],
798            Some(BitVec::from_slice(&[0b1111])),
799        )
800        .await;
801        assert_eq!(row_ids, None);
802
803        let row_ids = applier_factory(
804            vec![],
805            vec![(1, vec![(false, "world")])],
806            Some(BitVec::from_slice(&[0b1111])),
807        )
808        .await;
809        assert_eq!(row_ids, Some(rows([1])));
810
811        let row_ids = applier_factory(
812            vec![],
813            vec![(1, vec![(false, "world")])],
814            Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out
815        )
816        .await;
817        assert_eq!(row_ids, Some(rows([])));
818
819        let row_ids = applier_factory(
820            vec![],
821            vec![(1, vec![(true, "world")])],
822            Some(BitVec::from_slice(&[0b1111])),
823        )
824        .await;
825        assert_eq!(row_ids, None);
826
827        let row_ids = applier_factory(
828            vec![],
829            vec![(1, vec![(false, "Hello")])],
830            Some(BitVec::from_slice(&[0b1111])),
831        )
832        .await;
833        assert_eq!(row_ids, Some(rows([3])));
834
835        let row_ids = applier_factory(
836            vec![],
837            vec![(1, vec![(false, "Hello")])],
838            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
839        )
840        .await;
841        assert_eq!(row_ids, Some(rows([])));
842
843        let row_ids = applier_factory(
844            vec![],
845            vec![(1, vec![(true, "Hello")])],
846            Some(BitVec::from_slice(&[0b1111])),
847        )
848        .await;
849        assert_eq!(row_ids, None);
850
851        let row_ids = applier_factory(
852            vec![],
853            vec![(1, vec![(false, "Hello, World")])],
854            Some(BitVec::from_slice(&[0b1111])),
855        )
856        .await;
857        assert_eq!(row_ids, Some(rows([3])));
858
859        let row_ids = applier_factory(
860            vec![],
861            vec![(1, vec![(false, "Hello, World")])],
862            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
863        )
864        .await;
865        assert_eq!(row_ids, Some(rows([])));
866
867        let row_ids = applier_factory(
868            vec![],
869            vec![(1, vec![(true, "Hello, World")])],
870            Some(BitVec::from_slice(&[0b1111])),
871        )
872        .await;
873        assert_eq!(row_ids, None);
874    }
875
876    #[tokio::test]
877    async fn test_fulltext_index_basic_case_insensitive_tantivy() {
878        let applier_factory = build_fulltext_applier_factory(
879            "test_fulltext_index_basic_case_insensitive_tantivy_",
880            FulltextBackend::Tantivy,
881            &[
882                (None, Some("hello"), None),
883                (None, None, None),
884                (None, Some("world"), None),
885                (None, Some("Hello, World"), None),
886            ],
887        )
888        .await;
889
890        let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await;
891        assert_eq!(row_ids, Some(rows([0, 3])));
892
893        let row_ids = applier_factory(vec![(2, "world")], vec![], None).await;
894        assert_eq!(row_ids, Some(rows([2, 3])));
895
896        let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await;
897        assert_eq!(row_ids, Some(rows([0, 3])));
898
899        let row_ids = applier_factory(vec![(2, "World")], vec![], None).await;
900        assert_eq!(row_ids, Some(rows([2, 3])));
901
902        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await;
903        assert_eq!(row_ids, Some(rows([0, 3])));
904
905        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await;
906        assert_eq!(row_ids, Some(rows([0, 3])));
907
908        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await;
909        assert_eq!(row_ids, Some(rows([2, 3])));
910
911        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await;
912        assert_eq!(row_ids, Some(rows([2, 3])));
913
914        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await;
915        assert_eq!(row_ids, Some(rows([0, 3])));
916
917        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await;
918        assert_eq!(row_ids, Some(rows([0, 3])));
919
920        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await;
921        assert_eq!(row_ids, Some(rows([2, 3])));
922
923        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await;
924        assert_eq!(row_ids, Some(rows([2, 3])));
925    }
926
927    #[tokio::test]
928    async fn test_fulltext_index_basic_case_insensitive_bloom() {
929        let applier_factory = build_fulltext_applier_factory(
930            "test_fulltext_index_basic_case_insensitive_bloom_",
931            FulltextBackend::Bloom,
932            &[
933                (None, Some("hello"), None),
934                (None, None, None),
935                (None, Some("world"), None),
936                (None, Some("Hello, World"), None),
937            ],
938        )
939        .await;
940
941        let row_ids = applier_factory(
942            vec![],
943            vec![(2, vec![(false, "hello")])],
944            Some(BitVec::from_slice(&[0b1111])),
945        )
946        .await;
947        assert_eq!(row_ids, Some(rows([0, 3])));
948
949        let row_ids = applier_factory(
950            vec![],
951            vec![(2, vec![(false, "hello")])],
952            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
953        )
954        .await;
955        assert_eq!(row_ids, Some(rows([3])));
956
957        let row_ids = applier_factory(
958            vec![],
959            vec![(2, vec![(true, "hello")])],
960            Some(BitVec::from_slice(&[0b1111])),
961        )
962        .await;
963        assert_eq!(row_ids, Some(rows([0, 3])));
964
965        let row_ids = applier_factory(
966            vec![],
967            vec![(2, vec![(true, "hello")])],
968            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
969        )
970        .await;
971        assert_eq!(row_ids, Some(rows([3])));
972
973        let row_ids = applier_factory(
974            vec![],
975            vec![(2, vec![(false, "world")])],
976            Some(BitVec::from_slice(&[0b1111])),
977        )
978        .await;
979        assert_eq!(row_ids, Some(rows([2, 3])));
980
981        let row_ids = applier_factory(
982            vec![],
983            vec![(2, vec![(false, "world")])],
984            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
985        )
986        .await;
987        assert_eq!(row_ids, Some(rows([3])));
988
989        let row_ids = applier_factory(
990            vec![],
991            vec![(2, vec![(true, "world")])],
992            Some(BitVec::from_slice(&[0b1111])),
993        )
994        .await;
995        assert_eq!(row_ids, Some(rows([2, 3])));
996
997        let row_ids = applier_factory(
998            vec![],
999            vec![(2, vec![(true, "world")])],
1000            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1001        )
1002        .await;
1003        assert_eq!(row_ids, Some(rows([3])));
1004
1005        let row_ids = applier_factory(
1006            vec![],
1007            vec![(2, vec![(false, "Hello")])],
1008            Some(BitVec::from_slice(&[0b1111])),
1009        )
1010        .await;
1011        assert_eq!(row_ids, Some(rows([0, 3])));
1012
1013        let row_ids = applier_factory(
1014            vec![],
1015            vec![(2, vec![(false, "Hello")])],
1016            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
1017        )
1018        .await;
1019        assert_eq!(row_ids, Some(rows([0])));
1020
1021        let row_ids = applier_factory(
1022            vec![],
1023            vec![(2, vec![(true, "Hello")])],
1024            Some(BitVec::from_slice(&[0b1111])),
1025        )
1026        .await;
1027        assert_eq!(row_ids, Some(rows([0, 3])));
1028
1029        let row_ids = applier_factory(
1030            vec![],
1031            vec![(2, vec![(true, "Hello")])],
1032            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1033        )
1034        .await;
1035        assert_eq!(row_ids, Some(rows([3])));
1036
1037        let row_ids = applier_factory(
1038            vec![],
1039            vec![(2, vec![(false, "World")])],
1040            Some(BitVec::from_slice(&[0b1111])),
1041        )
1042        .await;
1043        assert_eq!(row_ids, Some(rows([2, 3])));
1044
1045        let row_ids = applier_factory(
1046            vec![],
1047            vec![(2, vec![(false, "World")])],
1048            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
1049        )
1050        .await;
1051        assert_eq!(row_ids, Some(rows([2])));
1052
1053        let row_ids = applier_factory(
1054            vec![],
1055            vec![(2, vec![(true, "World")])],
1056            Some(BitVec::from_slice(&[0b1111])),
1057        )
1058        .await;
1059        assert_eq!(row_ids, Some(rows([2, 3])));
1060
1061        let row_ids = applier_factory(
1062            vec![],
1063            vec![(2, vec![(true, "World")])],
1064            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1065        )
1066        .await;
1067        assert_eq!(row_ids, Some(rows([3])));
1068    }
1069
1070    #[tokio::test]
1071    async fn test_fulltext_index_basic_chinese_tantivy() {
1072        let applier_factory = build_fulltext_applier_factory(
1073            "test_fulltext_index_basic_chinese_tantivy_",
1074            FulltextBackend::Tantivy,
1075            &[
1076                (None, None, Some("你好")),
1077                (None, None, None),
1078                (None, None, Some("世界")),
1079                (None, None, Some("你好,世界")),
1080            ],
1081        )
1082        .await;
1083
1084        let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await;
1085        assert_eq!(row_ids, Some(rows([0, 3])));
1086
1087        let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await;
1088        assert_eq!(row_ids, Some(rows([2, 3])));
1089
1090        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await;
1091        assert_eq!(row_ids, Some(rows([0, 3])));
1092
1093        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await;
1094        assert_eq!(row_ids, Some(rows([2, 3])));
1095    }
1096
1097    #[tokio::test]
1098    async fn test_fulltext_index_basic_chinese_bloom() {
1099        let applier_factory = build_fulltext_applier_factory(
1100            "test_fulltext_index_basic_chinese_bloom_",
1101            FulltextBackend::Bloom,
1102            &[
1103                (None, None, Some("你好")),
1104                (None, None, None),
1105                (None, None, Some("世界")),
1106                (None, None, Some("你好,世界")),
1107            ],
1108        )
1109        .await;
1110
1111        let row_ids = applier_factory(
1112            vec![],
1113            vec![(3, vec![(false, "你好")])],
1114            Some(BitVec::from_slice(&[0b1111])),
1115        )
1116        .await;
1117        assert_eq!(row_ids, Some(rows([0, 3])));
1118
1119        let row_ids = applier_factory(
1120            vec![],
1121            vec![(3, vec![(false, "你好")])],
1122            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1123        )
1124        .await;
1125        assert_eq!(row_ids, Some(rows([3])));
1126
1127        let row_ids = applier_factory(
1128            vec![],
1129            vec![(3, vec![(false, "世界")])],
1130            Some(BitVec::from_slice(&[0b1111])),
1131        )
1132        .await;
1133        assert_eq!(row_ids, Some(rows([2, 3])));
1134
1135        let row_ids = applier_factory(
1136            vec![],
1137            vec![(3, vec![(false, "世界")])],
1138            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1139        )
1140        .await;
1141        assert_eq!(row_ids, Some(rows([3])));
1142    }
1143
1144    #[tokio::test]
1145    async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() {
1146        let applier_factory = build_fulltext_applier_factory(
1147            "test_fulltext_index_multi_terms_case_sensitive_tantivy_",
1148            FulltextBackend::Tantivy,
1149            &[
1150                (Some("Hello"), None, None),
1151                (Some("World"), None, None),
1152                (None, None, None),
1153                (Some("Hello, World"), None, None),
1154            ],
1155        )
1156        .await;
1157
1158        let row_ids = applier_factory(
1159            vec![],
1160            vec![(1, vec![(false, "hello"), (false, "world")])],
1161            None,
1162        )
1163        .await;
1164        assert_eq!(row_ids, Some(rows([])));
1165
1166        let row_ids = applier_factory(
1167            vec![],
1168            vec![(1, vec![(false, "Hello"), (false, "World")])],
1169            None,
1170        )
1171        .await;
1172        assert_eq!(row_ids, Some(rows([3])));
1173
1174        let row_ids = applier_factory(
1175            vec![],
1176            vec![(1, vec![(true, "Hello"), (false, "World")])],
1177            None,
1178        )
1179        .await;
1180        assert_eq!(row_ids, Some(rows([1, 3])));
1181
1182        let row_ids = applier_factory(
1183            vec![],
1184            vec![(1, vec![(false, "Hello"), (true, "World")])],
1185            None,
1186        )
1187        .await;
1188        assert_eq!(row_ids, Some(rows([0, 3])));
1189
1190        let row_ids = applier_factory(
1191            vec![],
1192            vec![(1, vec![(true, "Hello"), (true, "World")])],
1193            None,
1194        )
1195        .await;
1196        assert_eq!(row_ids, None);
1197    }
1198
1199    #[tokio::test]
1200    async fn test_fulltext_index_multi_terms_case_sensitive_bloom() {
1201        let applier_factory = build_fulltext_applier_factory(
1202            "test_fulltext_index_multi_terms_case_sensitive_bloom_",
1203            FulltextBackend::Bloom,
1204            &[
1205                (Some("Hello"), None, None),
1206                (Some("World"), None, None),
1207                (None, None, None),
1208                (Some("Hello, World"), None, None),
1209            ],
1210        )
1211        .await;
1212
1213        let row_ids = applier_factory(
1214            vec![],
1215            vec![(1, vec![(false, "hello"), (false, "world")])],
1216            Some(BitVec::from_slice(&[0b1111])),
1217        )
1218        .await;
1219        assert_eq!(row_ids, Some(rows([])));
1220
1221        let row_ids = applier_factory(
1222            vec![],
1223            vec![(1, vec![(false, "Hello"), (false, "World")])],
1224            Some(BitVec::from_slice(&[0b1111])),
1225        )
1226        .await;
1227        assert_eq!(row_ids, Some(rows([3])));
1228
1229        let row_ids = applier_factory(
1230            vec![],
1231            vec![(1, vec![(true, "Hello"), (false, "World")])],
1232            Some(BitVec::from_slice(&[0b1111])),
1233        )
1234        .await;
1235        assert_eq!(row_ids, Some(rows([1, 3])));
1236
1237        let row_ids = applier_factory(
1238            vec![],
1239            vec![(1, vec![(false, "Hello"), (true, "World")])],
1240            Some(BitVec::from_slice(&[0b1111])),
1241        )
1242        .await;
1243        assert_eq!(row_ids, Some(rows([0, 3])));
1244
1245        let row_ids = applier_factory(
1246            vec![],
1247            vec![(1, vec![(true, "Hello"), (true, "World")])],
1248            Some(BitVec::from_slice(&[0b1111])),
1249        )
1250        .await;
1251        assert_eq!(row_ids, None);
1252    }
1253
1254    #[tokio::test]
1255    async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() {
1256        let applier_factory = build_fulltext_applier_factory(
1257            "test_fulltext_index_multi_terms_case_insensitive_tantivy_",
1258            FulltextBackend::Tantivy,
1259            &[
1260                (None, Some("hello"), None),
1261                (None, None, None),
1262                (None, Some("world"), None),
1263                (None, Some("Hello, World"), None),
1264            ],
1265        )
1266        .await;
1267
1268        let row_ids = applier_factory(
1269            vec![],
1270            vec![(2, vec![(false, "hello"), (false, "world")])],
1271            None,
1272        )
1273        .await;
1274        assert_eq!(row_ids, Some(rows([3])));
1275
1276        let row_ids = applier_factory(
1277            vec![],
1278            vec![(2, vec![(true, "hello"), (false, "world")])],
1279            None,
1280        )
1281        .await;
1282        assert_eq!(row_ids, Some(rows([3])));
1283
1284        let row_ids = applier_factory(
1285            vec![],
1286            vec![(2, vec![(false, "hello"), (true, "world")])],
1287            None,
1288        )
1289        .await;
1290        assert_eq!(row_ids, Some(rows([3])));
1291
1292        let row_ids = applier_factory(
1293            vec![],
1294            vec![(2, vec![(true, "hello"), (true, "world")])],
1295            None,
1296        )
1297        .await;
1298        assert_eq!(row_ids, Some(rows([3])));
1299    }
1300
1301    #[tokio::test]
1302    async fn test_fulltext_index_multi_terms_case_insensitive_bloom() {
1303        let applier_factory = build_fulltext_applier_factory(
1304            "test_fulltext_index_multi_terms_case_insensitive_bloom_",
1305            FulltextBackend::Bloom,
1306            &[
1307                (None, Some("hello"), None),
1308                (None, None, None),
1309                (None, Some("world"), None),
1310                (None, Some("Hello, World"), None),
1311            ],
1312        )
1313        .await;
1314
1315        let row_ids = applier_factory(
1316            vec![],
1317            vec![(2, vec![(false, "hello"), (false, "world")])],
1318            Some(BitVec::from_slice(&[0b1111])),
1319        )
1320        .await;
1321        assert_eq!(row_ids, Some(rows([3])));
1322
1323        let row_ids = applier_factory(
1324            vec![],
1325            vec![(2, vec![(true, "hello"), (false, "world")])],
1326            Some(BitVec::from_slice(&[0b1111])),
1327        )
1328        .await;
1329        assert_eq!(row_ids, Some(rows([3])));
1330
1331        let row_ids = applier_factory(
1332            vec![],
1333            vec![(2, vec![(false, "hello"), (true, "world")])],
1334            Some(BitVec::from_slice(&[0b1111])),
1335        )
1336        .await;
1337        assert_eq!(row_ids, Some(rows([3])));
1338
1339        let row_ids = applier_factory(
1340            vec![],
1341            vec![(2, vec![(true, "hello"), (true, "world")])],
1342            Some(BitVec::from_slice(&[0b1111])),
1343        )
1344        .await;
1345        assert_eq!(row_ids, Some(rows([3])));
1346    }
1347
1348    #[tokio::test]
1349    async fn test_fulltext_index_multi_columns_tantivy() {
1350        let applier_factory = build_fulltext_applier_factory(
1351            "test_fulltext_index_multi_columns_tantivy_",
1352            FulltextBackend::Tantivy,
1353            &[
1354                (Some("Hello"), None, Some("你好")),
1355                (Some("World"), Some("world"), None),
1356                (None, Some("World"), Some("世界")),
1357                (
1358                    Some("Hello, World"),
1359                    Some("Hello, World"),
1360                    Some("你好,世界"),
1361                ),
1362            ],
1363        )
1364        .await;
1365
1366        let row_ids = applier_factory(
1367            vec![(1, "Hello"), (3, "你好")],
1368            vec![(2, vec![(false, "world")])],
1369            None,
1370        )
1371        .await;
1372        assert_eq!(row_ids, Some(rows([3])));
1373
1374        let row_ids =
1375            applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await;
1376        assert_eq!(row_ids, Some(rows([1, 3])));
1377    }
1378
1379    #[tokio::test]
1380    async fn test_fulltext_index_multi_columns_bloom() {
1381        let applier_factory = build_fulltext_applier_factory(
1382            "test_fulltext_index_multi_columns_bloom_",
1383            FulltextBackend::Bloom,
1384            &[
1385                (Some("Hello"), None, Some("你好")),
1386                (Some("World"), Some("world"), None),
1387                (None, Some("World"), Some("世界")),
1388                (
1389                    Some("Hello, World"),
1390                    Some("Hello, World"),
1391                    Some("你好,世界"),
1392                ),
1393            ],
1394        )
1395        .await;
1396
1397        let row_ids = applier_factory(
1398            vec![],
1399            vec![
1400                (1, vec![(false, "Hello")]),
1401                (2, vec![(false, "world")]),
1402                (3, vec![(false, "你好")]),
1403            ],
1404            Some(BitVec::from_slice(&[0b1111])),
1405        )
1406        .await;
1407        assert_eq!(row_ids, Some(rows([3])));
1408
1409        let row_ids = applier_factory(
1410            vec![],
1411            vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])],
1412            Some(BitVec::from_slice(&[0b1111])),
1413        )
1414        .await;
1415        assert_eq!(row_ids, Some(rows([1, 3])));
1416    }
1417}