mito2/sst/index/fulltext_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::warn;
20use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
21use index::fulltext_index::create::{
22    BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
23};
24use index::fulltext_index::{Analyzer, Config};
25use puffin::blob_metadata::CompressionCodec;
26use puffin::puffin_manager::PutOptions;
27use snafu::{ensure, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
30
31use crate::error::{
32    CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu,
33    FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
34};
35use crate::read::Batch;
36use crate::sst::file::FileId;
37use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
38use crate::sst::index::intermediate::{
39    IntermediateLocation, IntermediateManager, TempFileProvider,
40};
41use crate::sst::index::puffin_manager::SstPuffinWriter;
42use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
43use crate::sst::index::TYPE_FULLTEXT_INDEX;
44
45/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files.
46pub struct FulltextIndexer {
47    /// Creators for each column.
48    creators: HashMap<ColumnId, SingleCreator>,
49    /// Whether the index creation was aborted.
50    aborted: bool,
51    /// Statistics of index creation.
52    stats: Statistics,
53}
54
55impl FulltextIndexer {
56    /// Creates a new `FulltextIndexer`.
57    pub async fn new(
58        region_id: &RegionId,
59        sst_file_id: &FileId,
60        intermediate_manager: &IntermediateManager,
61        metadata: &RegionMetadataRef,
62        compress: bool,
63        mem_limit: usize,
64    ) -> Result<Option<Self>> {
65        let mut creators = HashMap::new();
66
67        for column in &metadata.column_metadatas {
68            let options = column
69                .column_schema
70                .fulltext_options()
71                .context(IndexOptionsSnafu {
72                    column_name: &column.column_schema.name,
73                })?;
74
75            // Relax the type constraint here as many types can be casted to string.
76
77            let options = match options {
78                Some(options) if options.enable => options,
79                _ => continue,
80            };
81
82            let column_id = column.column_id;
83            let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id);
84
85            let config = Config {
86                analyzer: match options.analyzer {
87                    FulltextAnalyzer::English => Analyzer::English,
88                    FulltextAnalyzer::Chinese => Analyzer::Chinese,
89                },
90                case_sensitive: options.case_sensitive,
91            };
92
93            let inner = match options.backend {
94                FulltextBackend::Tantivy => {
95                    let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
96                        .await
97                        .context(CreateFulltextCreatorSnafu)?;
98                    AltFulltextCreator::Tantivy(creator)
99                }
100                FulltextBackend::Bloom => {
101                    let temp_file_provider = Arc::new(TempFileProvider::new(
102                        IntermediateLocation::new(&metadata.region_id, sst_file_id),
103                        intermediate_manager.clone(),
104                    ));
105                    let global_memory_usage = Arc::new(AtomicUsize::new(0));
106                    let creator = BloomFilterFulltextIndexCreator::new(
107                        config,
108                        options.granularity as _,
109                        options.false_positive_rate(),
110                        temp_file_provider,
111                        global_memory_usage,
112                        Some(mem_limit),
113                    );
114                    AltFulltextCreator::Bloom(creator)
115                }
116            };
117
118            creators.insert(
119                column_id,
120                SingleCreator {
121                    column_id,
122                    inner,
123                    compress,
124                },
125            );
126        }
127
128        Ok((!creators.is_empty()).then(move || Self {
129            creators,
130            aborted: false,
131            stats: Statistics::new(TYPE_FULLTEXT_INDEX),
132        }))
133    }
134
135    /// Updates the index with the given batch.
136    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
137        ensure!(!self.aborted, OperateAbortedIndexSnafu);
138
139        if let Err(update_err) = self.do_update(batch).await {
140            if let Err(err) = self.do_abort().await {
141                if cfg!(any(test, feature = "test")) {
142                    panic!("Failed to abort index creator, err: {err}");
143                } else {
144                    warn!(err; "Failed to abort index creator");
145                }
146            }
147            return Err(update_err);
148        }
149
150        Ok(())
151    }
152
153    /// Finalizes the index creation.
154    pub async fn finish(
155        &mut self,
156        puffin_writer: &mut SstPuffinWriter,
157    ) -> Result<(RowCount, ByteCount)> {
158        ensure!(!self.aborted, OperateAbortedIndexSnafu);
159
160        match self.do_finish(puffin_writer).await {
161            Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())),
162            Err(finish_err) => {
163                if let Err(err) = self.do_abort().await {
164                    if cfg!(any(test, feature = "test")) {
165                        panic!("Failed to abort index creator, err: {err}");
166                    } else {
167                        warn!(err; "Failed to abort index creator");
168                    }
169                }
170                Err(finish_err)
171            }
172        }
173    }
174
175    /// Aborts the index creation.
176    pub async fn abort(&mut self) -> Result<()> {
177        if self.aborted {
178            return Ok(());
179        }
180
181        self.do_abort().await
182    }
183
184    /// Returns the memory usage of the index creator.
185    pub fn memory_usage(&self) -> usize {
186        self.creators.values().map(|c| c.inner.memory_usage()).sum()
187    }
188
189    /// Returns IDs of columns that the creator is responsible for.
190    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
191        self.creators.keys().copied()
192    }
193}
194
195impl FulltextIndexer {
196    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
197        let mut guard = self.stats.record_update();
198        guard.inc_row_count(batch.num_rows());
199
200        for creator in self.creators.values_mut() {
201            creator.update(batch).await?;
202        }
203
204        Ok(())
205    }
206
207    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
208        let mut guard = self.stats.record_finish();
209
210        let mut written_bytes = 0;
211        for creator in self.creators.values_mut() {
212            written_bytes += creator.finish(puffin_writer).await?;
213        }
214
215        guard.inc_byte_count(written_bytes);
216        Ok(())
217    }
218
219    async fn do_abort(&mut self) -> Result<()> {
220        let _guard = self.stats.record_cleanup();
221
222        self.aborted = true;
223
224        for (_, mut creator) in self.creators.drain() {
225            creator.abort().await?;
226        }
227
228        Ok(())
229    }
230}
231
232/// `SingleCreator` is a creator for a single column.
233struct SingleCreator {
234    /// Column ID.
235    column_id: ColumnId,
236    /// Inner creator.
237    inner: AltFulltextCreator,
238    /// Whether the index should be compressed.
239    compress: bool,
240}
241
242impl SingleCreator {
243    async fn update(&mut self, batch: &mut Batch) -> Result<()> {
244        let text_column = batch
245            .fields()
246            .iter()
247            .find(|c| c.column_id == self.column_id);
248        match text_column {
249            Some(column) => {
250                let data = column
251                    .data
252                    .cast(&ConcreteDataType::string_datatype())
253                    .context(CastVectorSnafu {
254                        from: column.data.data_type(),
255                        to: ConcreteDataType::string_datatype(),
256                    })?;
257
258                for i in 0..batch.num_rows() {
259                    let data = data.get_ref(i);
260                    let text = data
261                        .as_string()
262                        .context(DataTypeMismatchSnafu)?
263                        .unwrap_or_default();
264                    self.inner.push_text(text).await?;
265                }
266            }
267            _ => {
268                // If the column is not found in the batch, push empty text.
269                // Ensure that the number of texts pushed is the same as the number of rows in the SST,
270                // so that the texts are aligned with the row ids.
271                for _ in 0..batch.num_rows() {
272                    self.inner.push_text("").await?;
273                }
274            }
275        }
276
277        Ok(())
278    }
279
280    async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
281        let options = PutOptions {
282            compression: self.compress.then_some(CompressionCodec::Zstd),
283        };
284        self.inner
285            .finish(puffin_writer, &self.column_id, options)
286            .await
287    }
288
289    async fn abort(&mut self) -> Result<()> {
290        self.inner.abort(&self.column_id).await;
291        Ok(())
292    }
293}
294
295#[allow(dead_code, clippy::large_enum_variant)]
296/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter.
297enum AltFulltextCreator {
298    Tantivy(TantivyFulltextIndexCreator),
299    Bloom(BloomFilterFulltextIndexCreator),
300}
301
302impl AltFulltextCreator {
303    async fn push_text(&mut self, text: &str) -> Result<()> {
304        match self {
305            Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
306            Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
307        }
308    }
309
310    fn memory_usage(&self) -> usize {
311        match self {
312            Self::Tantivy(creator) => creator.memory_usage(),
313            Self::Bloom(creator) => creator.memory_usage(),
314        }
315    }
316
317    async fn finish(
318        &mut self,
319        puffin_writer: &mut SstPuffinWriter,
320        column_id: &ColumnId,
321        put_options: PutOptions,
322    ) -> Result<ByteCount> {
323        match self {
324            Self::Tantivy(creator) => {
325                let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id);
326                creator
327                    .finish(puffin_writer, &key, put_options)
328                    .await
329                    .context(FulltextFinishSnafu)
330            }
331            Self::Bloom(creator) => {
332                let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id);
333                creator
334                    .finish(puffin_writer, &key, put_options)
335                    .await
336                    .context(FulltextFinishSnafu)
337            }
338        }
339    }
340
341    async fn abort(&mut self, column_id: &ColumnId) {
342        match self {
343            Self::Tantivy(creator) => {
344                if let Err(err) = creator.abort().await {
345                    warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id);
346                }
347            }
348            Self::Bloom(creator) => {
349                if let Err(err) = creator.abort().await {
350                    warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id);
351                }
352            }
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::collections::{BTreeMap, BTreeSet};
360    use std::sync::Arc;
361
362    use api::v1::SemanticType;
363    use common_base::BitVec;
364    use datatypes::data_type::DataType;
365    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
366    use datatypes::vectors::{UInt64Vector, UInt8Vector};
367    use futures::future::BoxFuture;
368    use futures::FutureExt;
369    use index::fulltext_index::search::RowId;
370    use object_store::services::Memory;
371    use object_store::ObjectStore;
372    use puffin::puffin_manager::{PuffinManager, PuffinWriter};
373    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
374    use store_api::storage::{ConcreteDataType, RegionId};
375
376    use super::*;
377    use crate::access_layer::RegionFilePathFactory;
378    use crate::read::{Batch, BatchColumn};
379    use crate::sst::file::FileId;
380    use crate::sst::index::fulltext_index::applier::builder::{
381        FulltextQuery, FulltextRequest, FulltextTerm,
382    };
383    use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
384    use crate::sst::index::puffin_manager::PuffinManagerFactory;
385
386    fn mock_object_store() -> ObjectStore {
387        ObjectStore::new(Memory::default()).unwrap().finish()
388    }
389
390    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
391        IntermediateManager::init_fs(path).await.unwrap()
392    }
393
394    fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef {
395        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
396        builder
397            .push_column_metadata(ColumnMetadata {
398                column_schema: ColumnSchema::new(
399                    "text_english_case_sensitive",
400                    ConcreteDataType::string_datatype(),
401                    true,
402                )
403                .with_fulltext_options(FulltextOptions::new_unchecked(
404                    true,
405                    FulltextAnalyzer::English,
406                    true,
407                    backend.clone(),
408                    1,
409                    0.01,
410                ))
411                .unwrap(),
412                semantic_type: SemanticType::Field,
413                column_id: 1,
414            })
415            .push_column_metadata(ColumnMetadata {
416                column_schema: ColumnSchema::new(
417                    "text_english_case_insensitive",
418                    ConcreteDataType::string_datatype(),
419                    true,
420                )
421                .with_fulltext_options(FulltextOptions::new_unchecked(
422                    true,
423                    FulltextAnalyzer::English,
424                    false,
425                    backend.clone(),
426                    1,
427                    0.01,
428                ))
429                .unwrap(),
430                semantic_type: SemanticType::Field,
431                column_id: 2,
432            })
433            .push_column_metadata(ColumnMetadata {
434                column_schema: ColumnSchema::new(
435                    "text_chinese",
436                    ConcreteDataType::string_datatype(),
437                    true,
438                )
439                .with_fulltext_options(FulltextOptions::new_unchecked(
440                    true,
441                    FulltextAnalyzer::Chinese,
442                    false,
443                    backend.clone(),
444                    1,
445                    0.01,
446                ))
447                .unwrap(),
448                semantic_type: SemanticType::Field,
449                column_id: 3,
450            })
451            .push_column_metadata(ColumnMetadata {
452                column_schema: ColumnSchema::new(
453                    "ts",
454                    ConcreteDataType::timestamp_millisecond_datatype(),
455                    false,
456                ),
457                semantic_type: SemanticType::Timestamp,
458                column_id: 4,
459            });
460
461        Arc::new(builder.build().unwrap())
462    }
463
464    fn new_batch(
465        rows: &[(
466            Option<&str>, // text_english_case_sensitive
467            Option<&str>, // text_english_case_insensitive
468            Option<&str>, // text_chinese
469        )],
470    ) -> Batch {
471        let mut vec_english_sensitive =
472            ConcreteDataType::string_datatype().create_mutable_vector(0);
473        let mut vec_english_insensitive =
474            ConcreteDataType::string_datatype().create_mutable_vector(0);
475        let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0);
476
477        for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
478            match text_english_case_sensitive {
479                Some(s) => vec_english_sensitive.push_value_ref((*s).into()),
480                None => vec_english_sensitive.push_null(),
481            }
482            match text_english_case_insensitive {
483                Some(s) => vec_english_insensitive.push_value_ref((*s).into()),
484                None => vec_english_insensitive.push_null(),
485            }
486            match text_chinese {
487                Some(s) => vec_chinese.push_value_ref((*s).into()),
488                None => vec_chinese.push_null(),
489            }
490        }
491
492        let num_rows = vec_english_sensitive.len();
493        Batch::new(
494            vec![],
495            Arc::new(UInt64Vector::from_iter_values(
496                (0..num_rows).map(|n| n as u64),
497            )),
498            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
499                0, num_rows,
500            ))),
501            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
502                1, num_rows,
503            ))),
504            vec![
505                BatchColumn {
506                    column_id: 1,
507                    data: vec_english_sensitive.to_vector(),
508                },
509                BatchColumn {
510                    column_id: 2,
511                    data: vec_english_insensitive.to_vector(),
512                },
513                BatchColumn {
514                    column_id: 3,
515                    data: vec_chinese.to_vector(),
516                },
517            ],
518        )
519        .unwrap()
520    }
521
522    /// Applier factory that can handle both queries and terms.
523    ///
524    /// It builds a fulltext index with the given data rows, and returns a function
525    /// that can handle both queries and terms in a single request.
526    ///
527    /// The function takes two parameters:
528    /// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries
529    /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
530    async fn build_fulltext_applier_factory(
531        prefix: &str,
532        backend: FulltextBackend,
533        rows: &[(
534            Option<&str>, // text_english_case_sensitive
535            Option<&str>, // text_english_case_insensitive
536            Option<&str>, // text_chinese
537        )],
538    ) -> impl Fn(
539        Vec<(ColumnId, &str)>,
540        Vec<(ColumnId, Vec<(bool, &str)>)>,
541        Option<BitVec>,
542    ) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
543        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
544        let region_dir = "region0".to_string();
545        let sst_file_id = FileId::random();
546        let object_store = mock_object_store();
547        let region_metadata = mock_region_metadata(backend.clone());
548        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
549
550        let mut indexer = FulltextIndexer::new(
551            &region_metadata.region_id,
552            &sst_file_id,
553            &intm_mgr,
554            &region_metadata,
555            true,
556            1024,
557        )
558        .await
559        .unwrap()
560        .unwrap();
561
562        let mut batch = new_batch(rows);
563        indexer.update(&mut batch).await.unwrap();
564
565        let puffin_manager = factory.build(
566            object_store.clone(),
567            RegionFilePathFactory::new(region_dir.clone()),
568        );
569        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
570        let _ = indexer.finish(&mut writer).await.unwrap();
571        writer.finish().await.unwrap();
572
573        move |queries: Vec<(ColumnId, &str)>,
574              terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>,
575              coarse_mask: Option<BitVec>| {
576            let _d = &d;
577            let region_dir = region_dir.clone();
578            let object_store = object_store.clone();
579            let factory = factory.clone();
580
581            let mut requests: BTreeMap<ColumnId, FulltextRequest> = BTreeMap::new();
582
583            // Add queries
584            for (column_id, query) in queries {
585                requests
586                    .entry(column_id)
587                    .or_default()
588                    .queries
589                    .push(FulltextQuery(query.to_string()));
590            }
591
592            // Add terms
593            for (column_id, terms) in terms_requests {
594                let fulltext_terms = terms
595                    .into_iter()
596                    .map(|(col_lowered, term)| FulltextTerm {
597                        col_lowered,
598                        term: term.to_string(),
599                    })
600                    .collect::<Vec<_>>();
601
602                requests
603                    .entry(column_id)
604                    .or_default()
605                    .terms
606                    .extend(fulltext_terms);
607            }
608
609            let applier = FulltextIndexApplier::new(
610                region_dir,
611                region_metadata.region_id,
612                object_store,
613                requests,
614                factory,
615            );
616
617            let backend = backend.clone();
618            async move {
619                match backend {
620                    FulltextBackend::Tantivy => {
621                        applier.apply_fine(sst_file_id, None).await.unwrap()
622                    }
623                    FulltextBackend::Bloom => {
624                        let coarse_mask = coarse_mask.unwrap_or_default();
625                        let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
626                        // row group id == row id
627                        let resp = applier
628                            .apply_coarse(sst_file_id, None, row_groups)
629                            .await
630                            .unwrap();
631                        resp.map(|r| {
632                            r.into_iter()
633                                .filter(|(_, ranges)| !ranges.is_empty())
634                                .map(|(row_group_id, _)| row_group_id as RowId)
635                                .collect()
636                        })
637                    }
638                }
639            }
640            .boxed()
641        }
642    }
643
644    fn rows(row_ids: impl IntoIterator<Item = RowId>) -> BTreeSet<RowId> {
645        row_ids.into_iter().collect()
646    }
647
648    #[tokio::test]
649    async fn test_fulltext_index_basic_case_sensitive_tantivy() {
650        let applier_factory = build_fulltext_applier_factory(
651            "test_fulltext_index_basic_case_sensitive_tantivy_",
652            FulltextBackend::Tantivy,
653            &[
654                (Some("hello"), None, None),
655                (Some("world"), None, None),
656                (None, None, None),
657                (Some("Hello, World"), None, None),
658            ],
659        )
660        .await;
661
662        let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await;
663        assert_eq!(row_ids, Some(rows([0])));
664
665        let row_ids = applier_factory(vec![(1, "world")], vec![], None).await;
666        assert_eq!(row_ids, Some(rows([1])));
667
668        let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await;
669        assert_eq!(row_ids, Some(rows([3])));
670
671        let row_ids = applier_factory(vec![(1, "World")], vec![], None).await;
672        assert_eq!(row_ids, Some(rows([3])));
673
674        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await;
675        assert_eq!(row_ids, Some(rows([0])));
676
677        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await;
678        assert_eq!(row_ids, None);
679
680        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await;
681        assert_eq!(row_ids, Some(rows([1])));
682
683        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await;
684        assert_eq!(row_ids, None);
685
686        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await;
687        assert_eq!(row_ids, Some(rows([3])));
688
689        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await;
690        assert_eq!(row_ids, None);
691
692        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await;
693        assert_eq!(row_ids, Some(rows([3])));
694
695        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await;
696        assert_eq!(row_ids, None);
697    }
698
699    #[tokio::test]
700    async fn test_fulltext_index_basic_case_sensitive_bloom() {
701        let applier_factory = build_fulltext_applier_factory(
702            "test_fulltext_index_basic_case_sensitive_bloom_",
703            FulltextBackend::Bloom,
704            &[
705                (Some("hello"), None, None),
706                (Some("world"), None, None),
707                (None, None, None),
708                (Some("Hello, World"), None, None),
709            ],
710        )
711        .await;
712
713        let row_ids = applier_factory(
714            vec![],
715            vec![(1, vec![(false, "hello")])],
716            Some(BitVec::from_slice(&[0b1111])),
717        )
718        .await;
719        assert_eq!(row_ids, Some(rows([0])));
720
721        let row_ids = applier_factory(
722            vec![],
723            vec![(1, vec![(false, "hello")])],
724            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
725        )
726        .await;
727        assert_eq!(row_ids, Some(rows([])));
728
729        let row_ids = applier_factory(
730            vec![],
731            vec![(1, vec![(true, "hello")])],
732            Some(BitVec::from_slice(&[0b1111])),
733        )
734        .await;
735        assert_eq!(row_ids, None);
736
737        let row_ids = applier_factory(
738            vec![],
739            vec![(1, vec![(false, "world")])],
740            Some(BitVec::from_slice(&[0b1111])),
741        )
742        .await;
743        assert_eq!(row_ids, Some(rows([1])));
744
745        let row_ids = applier_factory(
746            vec![],
747            vec![(1, vec![(false, "world")])],
748            Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out
749        )
750        .await;
751        assert_eq!(row_ids, Some(rows([])));
752
753        let row_ids = applier_factory(
754            vec![],
755            vec![(1, vec![(true, "world")])],
756            Some(BitVec::from_slice(&[0b1111])),
757        )
758        .await;
759        assert_eq!(row_ids, None);
760
761        let row_ids = applier_factory(
762            vec![],
763            vec![(1, vec![(false, "Hello")])],
764            Some(BitVec::from_slice(&[0b1111])),
765        )
766        .await;
767        assert_eq!(row_ids, Some(rows([3])));
768
769        let row_ids = applier_factory(
770            vec![],
771            vec![(1, vec![(false, "Hello")])],
772            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
773        )
774        .await;
775        assert_eq!(row_ids, Some(rows([])));
776
777        let row_ids = applier_factory(
778            vec![],
779            vec![(1, vec![(true, "Hello")])],
780            Some(BitVec::from_slice(&[0b1111])),
781        )
782        .await;
783        assert_eq!(row_ids, None);
784
785        let row_ids = applier_factory(
786            vec![],
787            vec![(1, vec![(false, "Hello, World")])],
788            Some(BitVec::from_slice(&[0b1111])),
789        )
790        .await;
791        assert_eq!(row_ids, Some(rows([3])));
792
793        let row_ids = applier_factory(
794            vec![],
795            vec![(1, vec![(false, "Hello, World")])],
796            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
797        )
798        .await;
799        assert_eq!(row_ids, Some(rows([])));
800
801        let row_ids = applier_factory(
802            vec![],
803            vec![(1, vec![(true, "Hello, World")])],
804            Some(BitVec::from_slice(&[0b1111])),
805        )
806        .await;
807        assert_eq!(row_ids, None);
808    }
809
810    #[tokio::test]
811    async fn test_fulltext_index_basic_case_insensitive_tantivy() {
812        let applier_factory = build_fulltext_applier_factory(
813            "test_fulltext_index_basic_case_insensitive_tantivy_",
814            FulltextBackend::Tantivy,
815            &[
816                (None, Some("hello"), None),
817                (None, None, None),
818                (None, Some("world"), None),
819                (None, Some("Hello, World"), None),
820            ],
821        )
822        .await;
823
824        let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await;
825        assert_eq!(row_ids, Some(rows([0, 3])));
826
827        let row_ids = applier_factory(vec![(2, "world")], vec![], None).await;
828        assert_eq!(row_ids, Some(rows([2, 3])));
829
830        let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await;
831        assert_eq!(row_ids, Some(rows([0, 3])));
832
833        let row_ids = applier_factory(vec![(2, "World")], vec![], None).await;
834        assert_eq!(row_ids, Some(rows([2, 3])));
835
836        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await;
837        assert_eq!(row_ids, Some(rows([0, 3])));
838
839        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await;
840        assert_eq!(row_ids, Some(rows([0, 3])));
841
842        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await;
843        assert_eq!(row_ids, Some(rows([2, 3])));
844
845        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await;
846        assert_eq!(row_ids, Some(rows([2, 3])));
847
848        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await;
849        assert_eq!(row_ids, Some(rows([0, 3])));
850
851        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await;
852        assert_eq!(row_ids, Some(rows([0, 3])));
853
854        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await;
855        assert_eq!(row_ids, Some(rows([2, 3])));
856
857        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await;
858        assert_eq!(row_ids, Some(rows([2, 3])));
859    }
860
861    #[tokio::test]
862    async fn test_fulltext_index_basic_case_insensitive_bloom() {
863        let applier_factory = build_fulltext_applier_factory(
864            "test_fulltext_index_basic_case_insensitive_bloom_",
865            FulltextBackend::Bloom,
866            &[
867                (None, Some("hello"), None),
868                (None, None, None),
869                (None, Some("world"), None),
870                (None, Some("Hello, World"), None),
871            ],
872        )
873        .await;
874
875        let row_ids = applier_factory(
876            vec![],
877            vec![(2, vec![(false, "hello")])],
878            Some(BitVec::from_slice(&[0b1111])),
879        )
880        .await;
881        assert_eq!(row_ids, Some(rows([0, 3])));
882
883        let row_ids = applier_factory(
884            vec![],
885            vec![(2, vec![(false, "hello")])],
886            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
887        )
888        .await;
889        assert_eq!(row_ids, Some(rows([3])));
890
891        let row_ids = applier_factory(
892            vec![],
893            vec![(2, vec![(true, "hello")])],
894            Some(BitVec::from_slice(&[0b1111])),
895        )
896        .await;
897        assert_eq!(row_ids, Some(rows([0, 3])));
898
899        let row_ids = applier_factory(
900            vec![],
901            vec![(2, vec![(true, "hello")])],
902            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
903        )
904        .await;
905        assert_eq!(row_ids, Some(rows([3])));
906
907        let row_ids = applier_factory(
908            vec![],
909            vec![(2, vec![(false, "world")])],
910            Some(BitVec::from_slice(&[0b1111])),
911        )
912        .await;
913        assert_eq!(row_ids, Some(rows([2, 3])));
914
915        let row_ids = applier_factory(
916            vec![],
917            vec![(2, vec![(false, "world")])],
918            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
919        )
920        .await;
921        assert_eq!(row_ids, Some(rows([3])));
922
923        let row_ids = applier_factory(
924            vec![],
925            vec![(2, vec![(true, "world")])],
926            Some(BitVec::from_slice(&[0b1111])),
927        )
928        .await;
929        assert_eq!(row_ids, Some(rows([2, 3])));
930
931        let row_ids = applier_factory(
932            vec![],
933            vec![(2, vec![(true, "world")])],
934            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
935        )
936        .await;
937        assert_eq!(row_ids, Some(rows([3])));
938
939        let row_ids = applier_factory(
940            vec![],
941            vec![(2, vec![(false, "Hello")])],
942            Some(BitVec::from_slice(&[0b1111])),
943        )
944        .await;
945        assert_eq!(row_ids, Some(rows([0, 3])));
946
947        let row_ids = applier_factory(
948            vec![],
949            vec![(2, vec![(false, "Hello")])],
950            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
951        )
952        .await;
953        assert_eq!(row_ids, Some(rows([0])));
954
955        let row_ids = applier_factory(
956            vec![],
957            vec![(2, vec![(true, "Hello")])],
958            Some(BitVec::from_slice(&[0b1111])),
959        )
960        .await;
961        assert_eq!(row_ids, Some(rows([0, 3])));
962
963        let row_ids = applier_factory(
964            vec![],
965            vec![(2, vec![(true, "Hello")])],
966            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
967        )
968        .await;
969        assert_eq!(row_ids, Some(rows([3])));
970
971        let row_ids = applier_factory(
972            vec![],
973            vec![(2, vec![(false, "World")])],
974            Some(BitVec::from_slice(&[0b1111])),
975        )
976        .await;
977        assert_eq!(row_ids, Some(rows([2, 3])));
978
979        let row_ids = applier_factory(
980            vec![],
981            vec![(2, vec![(false, "World")])],
982            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
983        )
984        .await;
985        assert_eq!(row_ids, Some(rows([2])));
986
987        let row_ids = applier_factory(
988            vec![],
989            vec![(2, vec![(true, "World")])],
990            Some(BitVec::from_slice(&[0b1111])),
991        )
992        .await;
993        assert_eq!(row_ids, Some(rows([2, 3])));
994
995        let row_ids = applier_factory(
996            vec![],
997            vec![(2, vec![(true, "World")])],
998            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
999        )
1000        .await;
1001        assert_eq!(row_ids, Some(rows([3])));
1002    }
1003
1004    #[tokio::test]
1005    async fn test_fulltext_index_basic_chinese_tantivy() {
1006        let applier_factory = build_fulltext_applier_factory(
1007            "test_fulltext_index_basic_chinese_tantivy_",
1008            FulltextBackend::Tantivy,
1009            &[
1010                (None, None, Some("你好")),
1011                (None, None, None),
1012                (None, None, Some("世界")),
1013                (None, None, Some("你好,世界")),
1014            ],
1015        )
1016        .await;
1017
1018        let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await;
1019        assert_eq!(row_ids, Some(rows([0, 3])));
1020
1021        let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await;
1022        assert_eq!(row_ids, Some(rows([2, 3])));
1023
1024        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await;
1025        assert_eq!(row_ids, Some(rows([0, 3])));
1026
1027        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await;
1028        assert_eq!(row_ids, Some(rows([2, 3])));
1029    }
1030
1031    #[tokio::test]
1032    async fn test_fulltext_index_basic_chinese_bloom() {
1033        let applier_factory = build_fulltext_applier_factory(
1034            "test_fulltext_index_basic_chinese_bloom_",
1035            FulltextBackend::Bloom,
1036            &[
1037                (None, None, Some("你好")),
1038                (None, None, None),
1039                (None, None, Some("世界")),
1040                (None, None, Some("你好,世界")),
1041            ],
1042        )
1043        .await;
1044
1045        let row_ids = applier_factory(
1046            vec![],
1047            vec![(3, vec![(false, "你好")])],
1048            Some(BitVec::from_slice(&[0b1111])),
1049        )
1050        .await;
1051        assert_eq!(row_ids, Some(rows([0, 3])));
1052
1053        let row_ids = applier_factory(
1054            vec![],
1055            vec![(3, vec![(false, "你好")])],
1056            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1057        )
1058        .await;
1059        assert_eq!(row_ids, Some(rows([3])));
1060
1061        let row_ids = applier_factory(
1062            vec![],
1063            vec![(3, vec![(false, "世界")])],
1064            Some(BitVec::from_slice(&[0b1111])),
1065        )
1066        .await;
1067        assert_eq!(row_ids, Some(rows([2, 3])));
1068
1069        let row_ids = applier_factory(
1070            vec![],
1071            vec![(3, vec![(false, "世界")])],
1072            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1073        )
1074        .await;
1075        assert_eq!(row_ids, Some(rows([3])));
1076    }
1077
1078    #[tokio::test]
1079    async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() {
1080        let applier_factory = build_fulltext_applier_factory(
1081            "test_fulltext_index_multi_terms_case_sensitive_tantivy_",
1082            FulltextBackend::Tantivy,
1083            &[
1084                (Some("Hello"), None, None),
1085                (Some("World"), None, None),
1086                (None, None, None),
1087                (Some("Hello, World"), None, None),
1088            ],
1089        )
1090        .await;
1091
1092        let row_ids = applier_factory(
1093            vec![],
1094            vec![(1, vec![(false, "hello"), (false, "world")])],
1095            None,
1096        )
1097        .await;
1098        assert_eq!(row_ids, Some(rows([])));
1099
1100        let row_ids = applier_factory(
1101            vec![],
1102            vec![(1, vec![(false, "Hello"), (false, "World")])],
1103            None,
1104        )
1105        .await;
1106        assert_eq!(row_ids, Some(rows([3])));
1107
1108        let row_ids = applier_factory(
1109            vec![],
1110            vec![(1, vec![(true, "Hello"), (false, "World")])],
1111            None,
1112        )
1113        .await;
1114        assert_eq!(row_ids, Some(rows([1, 3])));
1115
1116        let row_ids = applier_factory(
1117            vec![],
1118            vec![(1, vec![(false, "Hello"), (true, "World")])],
1119            None,
1120        )
1121        .await;
1122        assert_eq!(row_ids, Some(rows([0, 3])));
1123
1124        let row_ids = applier_factory(
1125            vec![],
1126            vec![(1, vec![(true, "Hello"), (true, "World")])],
1127            None,
1128        )
1129        .await;
1130        assert_eq!(row_ids, None);
1131    }
1132
1133    #[tokio::test]
1134    async fn test_fulltext_index_multi_terms_case_sensitive_bloom() {
1135        let applier_factory = build_fulltext_applier_factory(
1136            "test_fulltext_index_multi_terms_case_sensitive_bloom_",
1137            FulltextBackend::Bloom,
1138            &[
1139                (Some("Hello"), None, None),
1140                (Some("World"), None, None),
1141                (None, None, None),
1142                (Some("Hello, World"), None, None),
1143            ],
1144        )
1145        .await;
1146
1147        let row_ids = applier_factory(
1148            vec![],
1149            vec![(1, vec![(false, "hello"), (false, "world")])],
1150            Some(BitVec::from_slice(&[0b1111])),
1151        )
1152        .await;
1153        assert_eq!(row_ids, Some(rows([])));
1154
1155        let row_ids = applier_factory(
1156            vec![],
1157            vec![(1, vec![(false, "Hello"), (false, "World")])],
1158            Some(BitVec::from_slice(&[0b1111])),
1159        )
1160        .await;
1161        assert_eq!(row_ids, Some(rows([3])));
1162
1163        let row_ids = applier_factory(
1164            vec![],
1165            vec![(1, vec![(true, "Hello"), (false, "World")])],
1166            Some(BitVec::from_slice(&[0b1111])),
1167        )
1168        .await;
1169        assert_eq!(row_ids, Some(rows([1, 3])));
1170
1171        let row_ids = applier_factory(
1172            vec![],
1173            vec![(1, vec![(false, "Hello"), (true, "World")])],
1174            Some(BitVec::from_slice(&[0b1111])),
1175        )
1176        .await;
1177        assert_eq!(row_ids, Some(rows([0, 3])));
1178
1179        let row_ids = applier_factory(
1180            vec![],
1181            vec![(1, vec![(true, "Hello"), (true, "World")])],
1182            Some(BitVec::from_slice(&[0b1111])),
1183        )
1184        .await;
1185        assert_eq!(row_ids, None);
1186    }
1187
1188    #[tokio::test]
1189    async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() {
1190        let applier_factory = build_fulltext_applier_factory(
1191            "test_fulltext_index_multi_terms_case_insensitive_tantivy_",
1192            FulltextBackend::Tantivy,
1193            &[
1194                (None, Some("hello"), None),
1195                (None, None, None),
1196                (None, Some("world"), None),
1197                (None, Some("Hello, World"), None),
1198            ],
1199        )
1200        .await;
1201
1202        let row_ids = applier_factory(
1203            vec![],
1204            vec![(2, vec![(false, "hello"), (false, "world")])],
1205            None,
1206        )
1207        .await;
1208        assert_eq!(row_ids, Some(rows([3])));
1209
1210        let row_ids = applier_factory(
1211            vec![],
1212            vec![(2, vec![(true, "hello"), (false, "world")])],
1213            None,
1214        )
1215        .await;
1216        assert_eq!(row_ids, Some(rows([3])));
1217
1218        let row_ids = applier_factory(
1219            vec![],
1220            vec![(2, vec![(false, "hello"), (true, "world")])],
1221            None,
1222        )
1223        .await;
1224        assert_eq!(row_ids, Some(rows([3])));
1225
1226        let row_ids = applier_factory(
1227            vec![],
1228            vec![(2, vec![(true, "hello"), (true, "world")])],
1229            None,
1230        )
1231        .await;
1232        assert_eq!(row_ids, Some(rows([3])));
1233    }
1234
1235    #[tokio::test]
1236    async fn test_fulltext_index_multi_terms_case_insensitive_bloom() {
1237        let applier_factory = build_fulltext_applier_factory(
1238            "test_fulltext_index_multi_terms_case_insensitive_bloom_",
1239            FulltextBackend::Bloom,
1240            &[
1241                (None, Some("hello"), None),
1242                (None, None, None),
1243                (None, Some("world"), None),
1244                (None, Some("Hello, World"), None),
1245            ],
1246        )
1247        .await;
1248
1249        let row_ids = applier_factory(
1250            vec![],
1251            vec![(2, vec![(false, "hello"), (false, "world")])],
1252            Some(BitVec::from_slice(&[0b1111])),
1253        )
1254        .await;
1255        assert_eq!(row_ids, Some(rows([3])));
1256
1257        let row_ids = applier_factory(
1258            vec![],
1259            vec![(2, vec![(true, "hello"), (false, "world")])],
1260            Some(BitVec::from_slice(&[0b1111])),
1261        )
1262        .await;
1263        assert_eq!(row_ids, Some(rows([3])));
1264
1265        let row_ids = applier_factory(
1266            vec![],
1267            vec![(2, vec![(false, "hello"), (true, "world")])],
1268            Some(BitVec::from_slice(&[0b1111])),
1269        )
1270        .await;
1271        assert_eq!(row_ids, Some(rows([3])));
1272
1273        let row_ids = applier_factory(
1274            vec![],
1275            vec![(2, vec![(true, "hello"), (true, "world")])],
1276            Some(BitVec::from_slice(&[0b1111])),
1277        )
1278        .await;
1279        assert_eq!(row_ids, Some(rows([3])));
1280    }
1281
1282    #[tokio::test]
1283    async fn test_fulltext_index_multi_columns_tantivy() {
1284        let applier_factory = build_fulltext_applier_factory(
1285            "test_fulltext_index_multi_columns_tantivy_",
1286            FulltextBackend::Tantivy,
1287            &[
1288                (Some("Hello"), None, Some("你好")),
1289                (Some("World"), Some("world"), None),
1290                (None, Some("World"), Some("世界")),
1291                (
1292                    Some("Hello, World"),
1293                    Some("Hello, World"),
1294                    Some("你好,世界"),
1295                ),
1296            ],
1297        )
1298        .await;
1299
1300        let row_ids = applier_factory(
1301            vec![(1, "Hello"), (3, "你好")],
1302            vec![(2, vec![(false, "world")])],
1303            None,
1304        )
1305        .await;
1306        assert_eq!(row_ids, Some(rows([3])));
1307
1308        let row_ids =
1309            applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await;
1310        assert_eq!(row_ids, Some(rows([1, 3])));
1311    }
1312
1313    #[tokio::test]
1314    async fn test_fulltext_index_multi_columns_bloom() {
1315        let applier_factory = build_fulltext_applier_factory(
1316            "test_fulltext_index_multi_columns_bloom_",
1317            FulltextBackend::Bloom,
1318            &[
1319                (Some("Hello"), None, Some("你好")),
1320                (Some("World"), Some("world"), None),
1321                (None, Some("World"), Some("世界")),
1322                (
1323                    Some("Hello, World"),
1324                    Some("Hello, World"),
1325                    Some("你好,世界"),
1326                ),
1327            ],
1328        )
1329        .await;
1330
1331        let row_ids = applier_factory(
1332            vec![],
1333            vec![
1334                (1, vec![(false, "Hello")]),
1335                (2, vec![(false, "world")]),
1336                (3, vec![(false, "你好")]),
1337            ],
1338            Some(BitVec::from_slice(&[0b1111])),
1339        )
1340        .await;
1341        assert_eq!(row_ids, Some(rows([3])));
1342
1343        let row_ids = applier_factory(
1344            vec![],
1345            vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])],
1346            Some(BitVec::from_slice(&[0b1111])),
1347        )
1348        .await;
1349        assert_eq!(row_ids, Some(rows([1, 3])));
1350    }
1351}