mito2/sst/index/fulltext_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::warn;
20use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
21use index::fulltext_index::create::{
22    BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
23};
24use index::fulltext_index::{Analyzer, Config};
25use puffin::blob_metadata::CompressionCodec;
26use puffin::puffin_manager::PutOptions;
27use snafu::{ensure, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
30
31use crate::error::{
32    CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu,
33    FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
34};
35use crate::read::Batch;
36use crate::sst::file::FileId;
37use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
38use crate::sst::index::intermediate::{
39    IntermediateLocation, IntermediateManager, TempFileProvider,
40};
41use crate::sst::index::puffin_manager::SstPuffinWriter;
42use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
43use crate::sst::index::TYPE_FULLTEXT_INDEX;
44
45/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files.
46pub struct FulltextIndexer {
47    /// Creators for each column.
48    creators: HashMap<ColumnId, SingleCreator>,
49    /// Whether the index creation was aborted.
50    aborted: bool,
51    /// Statistics of index creation.
52    stats: Statistics,
53}
54
55impl FulltextIndexer {
56    /// Creates a new `FulltextIndexer`.
57    pub async fn new(
58        region_id: &RegionId,
59        sst_file_id: &FileId,
60        intermediate_manager: &IntermediateManager,
61        metadata: &RegionMetadataRef,
62        compress: bool,
63        bloom_row_granularity: usize,
64        mem_limit: usize,
65    ) -> Result<Option<Self>> {
66        let mut creators = HashMap::new();
67
68        for column in &metadata.column_metadatas {
69            let options = column
70                .column_schema
71                .fulltext_options()
72                .context(IndexOptionsSnafu {
73                    column_name: &column.column_schema.name,
74                })?;
75
76            // Relax the type constraint here as many types can be casted to string.
77
78            let options = match options {
79                Some(options) if options.enable => options,
80                _ => continue,
81            };
82
83            let column_id = column.column_id;
84            let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id);
85
86            let config = Config {
87                analyzer: match options.analyzer {
88                    FulltextAnalyzer::English => Analyzer::English,
89                    FulltextAnalyzer::Chinese => Analyzer::Chinese,
90                },
91                case_sensitive: options.case_sensitive,
92            };
93
94            let inner = match options.backend {
95                FulltextBackend::Tantivy => {
96                    let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
97                        .await
98                        .context(CreateFulltextCreatorSnafu)?;
99                    AltFulltextCreator::Tantivy(creator)
100                }
101                FulltextBackend::Bloom => {
102                    let temp_file_provider = Arc::new(TempFileProvider::new(
103                        IntermediateLocation::new(&metadata.region_id, sst_file_id),
104                        intermediate_manager.clone(),
105                    ));
106                    let global_memory_usage = Arc::new(AtomicUsize::new(0));
107                    let creator = BloomFilterFulltextIndexCreator::new(
108                        config,
109                        bloom_row_granularity,
110                        temp_file_provider,
111                        global_memory_usage,
112                        Some(mem_limit),
113                    );
114                    AltFulltextCreator::Bloom(creator)
115                }
116            };
117
118            creators.insert(
119                column_id,
120                SingleCreator {
121                    column_id,
122                    inner,
123                    compress,
124                },
125            );
126        }
127
128        Ok((!creators.is_empty()).then(move || Self {
129            creators,
130            aborted: false,
131            stats: Statistics::new(TYPE_FULLTEXT_INDEX),
132        }))
133    }
134
135    /// Updates the index with the given batch.
136    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
137        ensure!(!self.aborted, OperateAbortedIndexSnafu);
138
139        if let Err(update_err) = self.do_update(batch).await {
140            if let Err(err) = self.do_abort().await {
141                if cfg!(any(test, feature = "test")) {
142                    panic!("Failed to abort index creator, err: {err}");
143                } else {
144                    warn!(err; "Failed to abort index creator");
145                }
146            }
147            return Err(update_err);
148        }
149
150        Ok(())
151    }
152
153    /// Finalizes the index creation.
154    pub async fn finish(
155        &mut self,
156        puffin_writer: &mut SstPuffinWriter,
157    ) -> Result<(RowCount, ByteCount)> {
158        ensure!(!self.aborted, OperateAbortedIndexSnafu);
159
160        match self.do_finish(puffin_writer).await {
161            Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())),
162            Err(finish_err) => {
163                if let Err(err) = self.do_abort().await {
164                    if cfg!(any(test, feature = "test")) {
165                        panic!("Failed to abort index creator, err: {err}");
166                    } else {
167                        warn!(err; "Failed to abort index creator");
168                    }
169                }
170                Err(finish_err)
171            }
172        }
173    }
174
175    /// Aborts the index creation.
176    pub async fn abort(&mut self) -> Result<()> {
177        if self.aborted {
178            return Ok(());
179        }
180
181        self.do_abort().await
182    }
183
184    /// Returns the memory usage of the index creator.
185    pub fn memory_usage(&self) -> usize {
186        self.creators.values().map(|c| c.inner.memory_usage()).sum()
187    }
188
189    /// Returns IDs of columns that the creator is responsible for.
190    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
191        self.creators.keys().copied()
192    }
193}
194
195impl FulltextIndexer {
196    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
197        let mut guard = self.stats.record_update();
198        guard.inc_row_count(batch.num_rows());
199
200        for creator in self.creators.values_mut() {
201            creator.update(batch).await?;
202        }
203
204        Ok(())
205    }
206
207    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
208        let mut guard = self.stats.record_finish();
209
210        let mut written_bytes = 0;
211        for creator in self.creators.values_mut() {
212            written_bytes += creator.finish(puffin_writer).await?;
213        }
214
215        guard.inc_byte_count(written_bytes);
216        Ok(())
217    }
218
219    async fn do_abort(&mut self) -> Result<()> {
220        let _guard = self.stats.record_cleanup();
221
222        self.aborted = true;
223
224        for (_, mut creator) in self.creators.drain() {
225            creator.abort().await?;
226        }
227
228        Ok(())
229    }
230}
231
232/// `SingleCreator` is a creator for a single column.
233struct SingleCreator {
234    /// Column ID.
235    column_id: ColumnId,
236    /// Inner creator.
237    inner: AltFulltextCreator,
238    /// Whether the index should be compressed.
239    compress: bool,
240}
241
242impl SingleCreator {
243    async fn update(&mut self, batch: &mut Batch) -> Result<()> {
244        let text_column = batch
245            .fields()
246            .iter()
247            .find(|c| c.column_id == self.column_id);
248        match text_column {
249            Some(column) => {
250                let data = column
251                    .data
252                    .cast(&ConcreteDataType::string_datatype())
253                    .context(CastVectorSnafu {
254                        from: column.data.data_type(),
255                        to: ConcreteDataType::string_datatype(),
256                    })?;
257
258                for i in 0..batch.num_rows() {
259                    let data = data.get_ref(i);
260                    let text = data
261                        .as_string()
262                        .context(FieldTypeMismatchSnafu)?
263                        .unwrap_or_default();
264                    self.inner.push_text(text).await?;
265                }
266            }
267            _ => {
268                // If the column is not found in the batch, push empty text.
269                // Ensure that the number of texts pushed is the same as the number of rows in the SST,
270                // so that the texts are aligned with the row ids.
271                for _ in 0..batch.num_rows() {
272                    self.inner.push_text("").await?;
273                }
274            }
275        }
276
277        Ok(())
278    }
279
280    async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
281        let options = PutOptions {
282            compression: self.compress.then_some(CompressionCodec::Zstd),
283        };
284        self.inner
285            .finish(puffin_writer, &self.column_id, options)
286            .await
287    }
288
289    async fn abort(&mut self) -> Result<()> {
290        self.inner.abort(&self.column_id).await;
291        Ok(())
292    }
293}
294
295#[allow(dead_code, clippy::large_enum_variant)]
296/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter.
297enum AltFulltextCreator {
298    Tantivy(TantivyFulltextIndexCreator),
299    Bloom(BloomFilterFulltextIndexCreator),
300}
301
302impl AltFulltextCreator {
303    async fn push_text(&mut self, text: &str) -> Result<()> {
304        match self {
305            Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
306            Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
307        }
308    }
309
310    fn memory_usage(&self) -> usize {
311        match self {
312            Self::Tantivy(creator) => creator.memory_usage(),
313            Self::Bloom(creator) => creator.memory_usage(),
314        }
315    }
316
317    async fn finish(
318        &mut self,
319        puffin_writer: &mut SstPuffinWriter,
320        column_id: &ColumnId,
321        put_options: PutOptions,
322    ) -> Result<ByteCount> {
323        match self {
324            Self::Tantivy(creator) => {
325                let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id);
326                creator
327                    .finish(puffin_writer, &key, put_options)
328                    .await
329                    .context(FulltextFinishSnafu)
330            }
331            Self::Bloom(creator) => {
332                let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id);
333                creator
334                    .finish(puffin_writer, &key, put_options)
335                    .await
336                    .context(FulltextFinishSnafu)
337            }
338        }
339    }
340
341    async fn abort(&mut self, column_id: &ColumnId) {
342        match self {
343            Self::Tantivy(creator) => {
344                if let Err(err) = creator.abort().await {
345                    warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id);
346                }
347            }
348            Self::Bloom(creator) => {
349                if let Err(err) = creator.abort().await {
350                    warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id);
351                }
352            }
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::collections::BTreeSet;
360    use std::sync::Arc;
361
362    use api::v1::SemanticType;
363    use common_base::BitVec;
364    use datatypes::data_type::DataType;
365    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
366    use datatypes::vectors::{UInt64Vector, UInt8Vector};
367    use futures::future::BoxFuture;
368    use futures::FutureExt;
369    use index::fulltext_index::search::RowId;
370    use object_store::services::Memory;
371    use object_store::ObjectStore;
372    use puffin::puffin_manager::{PuffinManager, PuffinWriter};
373    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
374    use store_api::storage::{ConcreteDataType, RegionId};
375
376    use super::*;
377    use crate::access_layer::RegionFilePathFactory;
378    use crate::read::{Batch, BatchColumn};
379    use crate::sst::file::FileId;
380    use crate::sst::index::fulltext_index::applier::builder::{
381        FulltextQuery, FulltextRequest, FulltextTerm,
382    };
383    use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
384    use crate::sst::index::puffin_manager::PuffinManagerFactory;
385
386    fn mock_object_store() -> ObjectStore {
387        ObjectStore::new(Memory::default()).unwrap().finish()
388    }
389
390    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
391        IntermediateManager::init_fs(path).await.unwrap()
392    }
393
394    fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef {
395        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
396        builder
397            .push_column_metadata(ColumnMetadata {
398                column_schema: ColumnSchema::new(
399                    "text_english_case_sensitive",
400                    ConcreteDataType::string_datatype(),
401                    true,
402                )
403                .with_fulltext_options(FulltextOptions {
404                    enable: true,
405                    analyzer: FulltextAnalyzer::English,
406                    case_sensitive: true,
407                    backend: backend.clone(),
408                })
409                .unwrap(),
410                semantic_type: SemanticType::Field,
411                column_id: 1,
412            })
413            .push_column_metadata(ColumnMetadata {
414                column_schema: ColumnSchema::new(
415                    "text_english_case_insensitive",
416                    ConcreteDataType::string_datatype(),
417                    true,
418                )
419                .with_fulltext_options(FulltextOptions {
420                    enable: true,
421                    analyzer: FulltextAnalyzer::English,
422                    case_sensitive: false,
423                    backend: backend.clone(),
424                })
425                .unwrap(),
426                semantic_type: SemanticType::Field,
427                column_id: 2,
428            })
429            .push_column_metadata(ColumnMetadata {
430                column_schema: ColumnSchema::new(
431                    "text_chinese",
432                    ConcreteDataType::string_datatype(),
433                    true,
434                )
435                .with_fulltext_options(FulltextOptions {
436                    enable: true,
437                    analyzer: FulltextAnalyzer::Chinese,
438                    case_sensitive: false,
439                    backend: backend.clone(),
440                })
441                .unwrap(),
442                semantic_type: SemanticType::Field,
443                column_id: 3,
444            })
445            .push_column_metadata(ColumnMetadata {
446                column_schema: ColumnSchema::new(
447                    "ts",
448                    ConcreteDataType::timestamp_millisecond_datatype(),
449                    false,
450                ),
451                semantic_type: SemanticType::Timestamp,
452                column_id: 4,
453            });
454
455        Arc::new(builder.build().unwrap())
456    }
457
458    fn new_batch(
459        rows: &[(
460            Option<&str>, // text_english_case_sensitive
461            Option<&str>, // text_english_case_insensitive
462            Option<&str>, // text_chinese
463        )],
464    ) -> Batch {
465        let mut vec_english_sensitive =
466            ConcreteDataType::string_datatype().create_mutable_vector(0);
467        let mut vec_english_insensitive =
468            ConcreteDataType::string_datatype().create_mutable_vector(0);
469        let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0);
470
471        for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
472            match text_english_case_sensitive {
473                Some(s) => vec_english_sensitive.push_value_ref((*s).into()),
474                None => vec_english_sensitive.push_null(),
475            }
476            match text_english_case_insensitive {
477                Some(s) => vec_english_insensitive.push_value_ref((*s).into()),
478                None => vec_english_insensitive.push_null(),
479            }
480            match text_chinese {
481                Some(s) => vec_chinese.push_value_ref((*s).into()),
482                None => vec_chinese.push_null(),
483            }
484        }
485
486        let num_rows = vec_english_sensitive.len();
487        Batch::new(
488            vec![],
489            Arc::new(UInt64Vector::from_iter_values(
490                (0..num_rows).map(|n| n as u64),
491            )),
492            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
493                0, num_rows,
494            ))),
495            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
496                1, num_rows,
497            ))),
498            vec![
499                BatchColumn {
500                    column_id: 1,
501                    data: vec_english_sensitive.to_vector(),
502                },
503                BatchColumn {
504                    column_id: 2,
505                    data: vec_english_insensitive.to_vector(),
506                },
507                BatchColumn {
508                    column_id: 3,
509                    data: vec_chinese.to_vector(),
510                },
511            ],
512        )
513        .unwrap()
514    }
515
516    /// Applier factory that can handle both queries and terms.
517    ///
518    /// It builds a fulltext index with the given data rows, and returns a function
519    /// that can handle both queries and terms in a single request.
520    ///
521    /// The function takes two parameters:
522    /// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries
523    /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
524    async fn build_fulltext_applier_factory(
525        prefix: &str,
526        backend: FulltextBackend,
527        rows: &[(
528            Option<&str>, // text_english_case_sensitive
529            Option<&str>, // text_english_case_insensitive
530            Option<&str>, // text_chinese
531        )],
532    ) -> impl Fn(
533        Vec<(ColumnId, &str)>,
534        Vec<(ColumnId, Vec<(bool, &str)>)>,
535        Option<BitVec>,
536    ) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
537        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
538        let region_dir = "region0".to_string();
539        let sst_file_id = FileId::random();
540        let object_store = mock_object_store();
541        let region_metadata = mock_region_metadata(backend.clone());
542        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
543
544        let mut indexer = FulltextIndexer::new(
545            &region_metadata.region_id,
546            &sst_file_id,
547            &intm_mgr,
548            &region_metadata,
549            true,
550            1,
551            1024,
552        )
553        .await
554        .unwrap()
555        .unwrap();
556
557        let mut batch = new_batch(rows);
558        indexer.update(&mut batch).await.unwrap();
559
560        let puffin_manager = factory.build(
561            object_store.clone(),
562            RegionFilePathFactory::new(region_dir.clone()),
563        );
564        let mut writer = puffin_manager.writer(&sst_file_id).await.unwrap();
565        let _ = indexer.finish(&mut writer).await.unwrap();
566        writer.finish().await.unwrap();
567
568        move |queries: Vec<(ColumnId, &str)>,
569              terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>,
570              coarse_mask: Option<BitVec>| {
571            let _d = &d;
572            let region_dir = region_dir.clone();
573            let object_store = object_store.clone();
574            let factory = factory.clone();
575
576            let mut requests: HashMap<ColumnId, FulltextRequest> = HashMap::new();
577
578            // Add queries
579            for (column_id, query) in queries {
580                requests
581                    .entry(column_id)
582                    .or_default()
583                    .queries
584                    .push(FulltextQuery(query.to_string()));
585            }
586
587            // Add terms
588            for (column_id, terms) in terms_requests {
589                let fulltext_terms = terms
590                    .into_iter()
591                    .map(|(col_lowered, term)| FulltextTerm {
592                        col_lowered,
593                        term: term.to_string(),
594                    })
595                    .collect::<Vec<_>>();
596
597                requests
598                    .entry(column_id)
599                    .or_default()
600                    .terms
601                    .extend(fulltext_terms);
602            }
603
604            let applier = FulltextIndexApplier::new(
605                region_dir,
606                region_metadata.region_id,
607                object_store,
608                requests,
609                factory,
610            );
611
612            let backend = backend.clone();
613            async move {
614                match backend {
615                    FulltextBackend::Tantivy => {
616                        applier.apply_fine(sst_file_id, None).await.unwrap()
617                    }
618                    FulltextBackend::Bloom => {
619                        let coarse_mask = coarse_mask.unwrap_or_default();
620                        let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
621                        // row group id == row id
622                        let resp = applier
623                            .apply_coarse(sst_file_id, None, row_groups)
624                            .await
625                            .unwrap();
626                        resp.map(|r| {
627                            r.into_iter()
628                                .map(|(row_group_id, _)| row_group_id as RowId)
629                                .collect()
630                        })
631                    }
632                }
633            }
634            .boxed()
635        }
636    }
637
638    fn rows(row_ids: impl IntoIterator<Item = RowId>) -> BTreeSet<RowId> {
639        row_ids.into_iter().collect()
640    }
641
642    #[tokio::test]
643    async fn test_fulltext_index_basic_case_sensitive_tantivy() {
644        let applier_factory = build_fulltext_applier_factory(
645            "test_fulltext_index_basic_case_sensitive_tantivy_",
646            FulltextBackend::Tantivy,
647            &[
648                (Some("hello"), None, None),
649                (Some("world"), None, None),
650                (None, None, None),
651                (Some("Hello, World"), None, None),
652            ],
653        )
654        .await;
655
656        let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await;
657        assert_eq!(row_ids, Some(rows([0])));
658
659        let row_ids = applier_factory(vec![(1, "world")], vec![], None).await;
660        assert_eq!(row_ids, Some(rows([1])));
661
662        let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await;
663        assert_eq!(row_ids, Some(rows([3])));
664
665        let row_ids = applier_factory(vec![(1, "World")], vec![], None).await;
666        assert_eq!(row_ids, Some(rows([3])));
667
668        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await;
669        assert_eq!(row_ids, Some(rows([0])));
670
671        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await;
672        assert_eq!(row_ids, None);
673
674        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await;
675        assert_eq!(row_ids, Some(rows([1])));
676
677        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await;
678        assert_eq!(row_ids, None);
679
680        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await;
681        assert_eq!(row_ids, Some(rows([3])));
682
683        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await;
684        assert_eq!(row_ids, None);
685
686        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await;
687        assert_eq!(row_ids, Some(rows([3])));
688
689        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await;
690        assert_eq!(row_ids, None);
691    }
692
693    #[tokio::test]
694    async fn test_fulltext_index_basic_case_sensitive_bloom() {
695        let applier_factory = build_fulltext_applier_factory(
696            "test_fulltext_index_basic_case_sensitive_bloom_",
697            FulltextBackend::Bloom,
698            &[
699                (Some("hello"), None, None),
700                (Some("world"), None, None),
701                (None, None, None),
702                (Some("Hello, World"), None, None),
703            ],
704        )
705        .await;
706
707        let row_ids = applier_factory(
708            vec![],
709            vec![(1, vec![(false, "hello")])],
710            Some(BitVec::from_slice(&[0b1111])),
711        )
712        .await;
713        assert_eq!(row_ids, Some(rows([0])));
714
715        let row_ids = applier_factory(
716            vec![],
717            vec![(1, vec![(false, "hello")])],
718            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
719        )
720        .await;
721        assert_eq!(row_ids, Some(rows([])));
722
723        let row_ids = applier_factory(
724            vec![],
725            vec![(1, vec![(true, "hello")])],
726            Some(BitVec::from_slice(&[0b1111])),
727        )
728        .await;
729        assert_eq!(row_ids, None);
730
731        let row_ids = applier_factory(
732            vec![],
733            vec![(1, vec![(false, "world")])],
734            Some(BitVec::from_slice(&[0b1111])),
735        )
736        .await;
737        assert_eq!(row_ids, Some(rows([1])));
738
739        let row_ids = applier_factory(
740            vec![],
741            vec![(1, vec![(false, "world")])],
742            Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out
743        )
744        .await;
745        assert_eq!(row_ids, Some(rows([])));
746
747        let row_ids = applier_factory(
748            vec![],
749            vec![(1, vec![(true, "world")])],
750            Some(BitVec::from_slice(&[0b1111])),
751        )
752        .await;
753        assert_eq!(row_ids, None);
754
755        let row_ids = applier_factory(
756            vec![],
757            vec![(1, vec![(false, "Hello")])],
758            Some(BitVec::from_slice(&[0b1111])),
759        )
760        .await;
761        assert_eq!(row_ids, Some(rows([3])));
762
763        let row_ids = applier_factory(
764            vec![],
765            vec![(1, vec![(false, "Hello")])],
766            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
767        )
768        .await;
769        assert_eq!(row_ids, Some(rows([])));
770
771        let row_ids = applier_factory(
772            vec![],
773            vec![(1, vec![(true, "Hello")])],
774            Some(BitVec::from_slice(&[0b1111])),
775        )
776        .await;
777        assert_eq!(row_ids, None);
778
779        let row_ids = applier_factory(
780            vec![],
781            vec![(1, vec![(false, "Hello, World")])],
782            Some(BitVec::from_slice(&[0b1111])),
783        )
784        .await;
785        assert_eq!(row_ids, Some(rows([3])));
786
787        let row_ids = applier_factory(
788            vec![],
789            vec![(1, vec![(false, "Hello, World")])],
790            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
791        )
792        .await;
793        assert_eq!(row_ids, Some(rows([])));
794
795        let row_ids = applier_factory(
796            vec![],
797            vec![(1, vec![(true, "Hello, World")])],
798            Some(BitVec::from_slice(&[0b1111])),
799        )
800        .await;
801        assert_eq!(row_ids, None);
802    }
803
804    #[tokio::test]
805    async fn test_fulltext_index_basic_case_insensitive_tantivy() {
806        let applier_factory = build_fulltext_applier_factory(
807            "test_fulltext_index_basic_case_insensitive_tantivy_",
808            FulltextBackend::Tantivy,
809            &[
810                (None, Some("hello"), None),
811                (None, None, None),
812                (None, Some("world"), None),
813                (None, Some("Hello, World"), None),
814            ],
815        )
816        .await;
817
818        let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await;
819        assert_eq!(row_ids, Some(rows([0, 3])));
820
821        let row_ids = applier_factory(vec![(2, "world")], vec![], None).await;
822        assert_eq!(row_ids, Some(rows([2, 3])));
823
824        let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await;
825        assert_eq!(row_ids, Some(rows([0, 3])));
826
827        let row_ids = applier_factory(vec![(2, "World")], vec![], None).await;
828        assert_eq!(row_ids, Some(rows([2, 3])));
829
830        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await;
831        assert_eq!(row_ids, Some(rows([0, 3])));
832
833        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await;
834        assert_eq!(row_ids, Some(rows([0, 3])));
835
836        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await;
837        assert_eq!(row_ids, Some(rows([2, 3])));
838
839        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await;
840        assert_eq!(row_ids, Some(rows([2, 3])));
841
842        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await;
843        assert_eq!(row_ids, Some(rows([0, 3])));
844
845        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await;
846        assert_eq!(row_ids, Some(rows([0, 3])));
847
848        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await;
849        assert_eq!(row_ids, Some(rows([2, 3])));
850
851        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await;
852        assert_eq!(row_ids, Some(rows([2, 3])));
853    }
854
855    #[tokio::test]
856    async fn test_fulltext_index_basic_case_insensitive_bloom() {
857        let applier_factory = build_fulltext_applier_factory(
858            "test_fulltext_index_basic_case_insensitive_bloom_",
859            FulltextBackend::Bloom,
860            &[
861                (None, Some("hello"), None),
862                (None, None, None),
863                (None, Some("world"), None),
864                (None, Some("Hello, World"), None),
865            ],
866        )
867        .await;
868
869        let row_ids = applier_factory(
870            vec![],
871            vec![(2, vec![(false, "hello")])],
872            Some(BitVec::from_slice(&[0b1111])),
873        )
874        .await;
875        assert_eq!(row_ids, Some(rows([0, 3])));
876
877        let row_ids = applier_factory(
878            vec![],
879            vec![(2, vec![(false, "hello")])],
880            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
881        )
882        .await;
883        assert_eq!(row_ids, Some(rows([3])));
884
885        let row_ids = applier_factory(
886            vec![],
887            vec![(2, vec![(true, "hello")])],
888            Some(BitVec::from_slice(&[0b1111])),
889        )
890        .await;
891        assert_eq!(row_ids, Some(rows([0, 3])));
892
893        let row_ids = applier_factory(
894            vec![],
895            vec![(2, vec![(true, "hello")])],
896            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
897        )
898        .await;
899        assert_eq!(row_ids, Some(rows([3])));
900
901        let row_ids = applier_factory(
902            vec![],
903            vec![(2, vec![(false, "world")])],
904            Some(BitVec::from_slice(&[0b1111])),
905        )
906        .await;
907        assert_eq!(row_ids, Some(rows([2, 3])));
908
909        let row_ids = applier_factory(
910            vec![],
911            vec![(2, vec![(false, "world")])],
912            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
913        )
914        .await;
915        assert_eq!(row_ids, Some(rows([3])));
916
917        let row_ids = applier_factory(
918            vec![],
919            vec![(2, vec![(true, "world")])],
920            Some(BitVec::from_slice(&[0b1111])),
921        )
922        .await;
923        assert_eq!(row_ids, Some(rows([2, 3])));
924
925        let row_ids = applier_factory(
926            vec![],
927            vec![(2, vec![(true, "world")])],
928            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
929        )
930        .await;
931        assert_eq!(row_ids, Some(rows([3])));
932
933        let row_ids = applier_factory(
934            vec![],
935            vec![(2, vec![(false, "Hello")])],
936            Some(BitVec::from_slice(&[0b1111])),
937        )
938        .await;
939        assert_eq!(row_ids, Some(rows([0, 3])));
940
941        let row_ids = applier_factory(
942            vec![],
943            vec![(2, vec![(false, "Hello")])],
944            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
945        )
946        .await;
947        assert_eq!(row_ids, Some(rows([0])));
948
949        let row_ids = applier_factory(
950            vec![],
951            vec![(2, vec![(true, "Hello")])],
952            Some(BitVec::from_slice(&[0b1111])),
953        )
954        .await;
955        assert_eq!(row_ids, Some(rows([0, 3])));
956
957        let row_ids = applier_factory(
958            vec![],
959            vec![(2, vec![(true, "Hello")])],
960            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
961        )
962        .await;
963        assert_eq!(row_ids, Some(rows([3])));
964
965        let row_ids = applier_factory(
966            vec![],
967            vec![(2, vec![(false, "World")])],
968            Some(BitVec::from_slice(&[0b1111])),
969        )
970        .await;
971        assert_eq!(row_ids, Some(rows([2, 3])));
972
973        let row_ids = applier_factory(
974            vec![],
975            vec![(2, vec![(false, "World")])],
976            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
977        )
978        .await;
979        assert_eq!(row_ids, Some(rows([2])));
980
981        let row_ids = applier_factory(
982            vec![],
983            vec![(2, vec![(true, "World")])],
984            Some(BitVec::from_slice(&[0b1111])),
985        )
986        .await;
987        assert_eq!(row_ids, Some(rows([2, 3])));
988
989        let row_ids = applier_factory(
990            vec![],
991            vec![(2, vec![(true, "World")])],
992            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
993        )
994        .await;
995        assert_eq!(row_ids, Some(rows([3])));
996    }
997
998    #[tokio::test]
999    async fn test_fulltext_index_basic_chinese_tantivy() {
1000        let applier_factory = build_fulltext_applier_factory(
1001            "test_fulltext_index_basic_chinese_tantivy_",
1002            FulltextBackend::Tantivy,
1003            &[
1004                (None, None, Some("你好")),
1005                (None, None, None),
1006                (None, None, Some("世界")),
1007                (None, None, Some("你好,世界")),
1008            ],
1009        )
1010        .await;
1011
1012        let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await;
1013        assert_eq!(row_ids, Some(rows([0, 3])));
1014
1015        let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await;
1016        assert_eq!(row_ids, Some(rows([2, 3])));
1017
1018        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await;
1019        assert_eq!(row_ids, Some(rows([0, 3])));
1020
1021        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await;
1022        assert_eq!(row_ids, Some(rows([2, 3])));
1023    }
1024
1025    #[tokio::test]
1026    async fn test_fulltext_index_basic_chinese_bloom() {
1027        let applier_factory = build_fulltext_applier_factory(
1028            "test_fulltext_index_basic_chinese_bloom_",
1029            FulltextBackend::Bloom,
1030            &[
1031                (None, None, Some("你好")),
1032                (None, None, None),
1033                (None, None, Some("世界")),
1034                (None, None, Some("你好,世界")),
1035            ],
1036        )
1037        .await;
1038
1039        let row_ids = applier_factory(
1040            vec![],
1041            vec![(3, vec![(false, "你好")])],
1042            Some(BitVec::from_slice(&[0b1111])),
1043        )
1044        .await;
1045        assert_eq!(row_ids, Some(rows([0, 3])));
1046
1047        let row_ids = applier_factory(
1048            vec![],
1049            vec![(3, vec![(false, "你好")])],
1050            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1051        )
1052        .await;
1053        assert_eq!(row_ids, Some(rows([3])));
1054
1055        let row_ids = applier_factory(
1056            vec![],
1057            vec![(3, vec![(false, "世界")])],
1058            Some(BitVec::from_slice(&[0b1111])),
1059        )
1060        .await;
1061        assert_eq!(row_ids, Some(rows([2, 3])));
1062
1063        let row_ids = applier_factory(
1064            vec![],
1065            vec![(3, vec![(false, "世界")])],
1066            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1067        )
1068        .await;
1069        assert_eq!(row_ids, Some(rows([3])));
1070    }
1071
1072    #[tokio::test]
1073    async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() {
1074        let applier_factory = build_fulltext_applier_factory(
1075            "test_fulltext_index_multi_terms_case_sensitive_tantivy_",
1076            FulltextBackend::Tantivy,
1077            &[
1078                (Some("Hello"), None, None),
1079                (Some("World"), None, None),
1080                (None, None, None),
1081                (Some("Hello, World"), None, None),
1082            ],
1083        )
1084        .await;
1085
1086        let row_ids = applier_factory(
1087            vec![],
1088            vec![(1, vec![(false, "hello"), (false, "world")])],
1089            None,
1090        )
1091        .await;
1092        assert_eq!(row_ids, Some(rows([])));
1093
1094        let row_ids = applier_factory(
1095            vec![],
1096            vec![(1, vec![(false, "Hello"), (false, "World")])],
1097            None,
1098        )
1099        .await;
1100        assert_eq!(row_ids, Some(rows([3])));
1101
1102        let row_ids = applier_factory(
1103            vec![],
1104            vec![(1, vec![(true, "Hello"), (false, "World")])],
1105            None,
1106        )
1107        .await;
1108        assert_eq!(row_ids, Some(rows([1, 3])));
1109
1110        let row_ids = applier_factory(
1111            vec![],
1112            vec![(1, vec![(false, "Hello"), (true, "World")])],
1113            None,
1114        )
1115        .await;
1116        assert_eq!(row_ids, Some(rows([0, 3])));
1117
1118        let row_ids = applier_factory(
1119            vec![],
1120            vec![(1, vec![(true, "Hello"), (true, "World")])],
1121            None,
1122        )
1123        .await;
1124        assert_eq!(row_ids, None);
1125    }
1126
1127    #[tokio::test]
1128    async fn test_fulltext_index_multi_terms_case_sensitive_bloom() {
1129        let applier_factory = build_fulltext_applier_factory(
1130            "test_fulltext_index_multi_terms_case_sensitive_bloom_",
1131            FulltextBackend::Bloom,
1132            &[
1133                (Some("Hello"), None, None),
1134                (Some("World"), None, None),
1135                (None, None, None),
1136                (Some("Hello, World"), None, None),
1137            ],
1138        )
1139        .await;
1140
1141        let row_ids = applier_factory(
1142            vec![],
1143            vec![(1, vec![(false, "hello"), (false, "world")])],
1144            Some(BitVec::from_slice(&[0b1111])),
1145        )
1146        .await;
1147        assert_eq!(row_ids, Some(rows([])));
1148
1149        let row_ids = applier_factory(
1150            vec![],
1151            vec![(1, vec![(false, "Hello"), (false, "World")])],
1152            Some(BitVec::from_slice(&[0b1111])),
1153        )
1154        .await;
1155        assert_eq!(row_ids, Some(rows([3])));
1156
1157        let row_ids = applier_factory(
1158            vec![],
1159            vec![(1, vec![(true, "Hello"), (false, "World")])],
1160            Some(BitVec::from_slice(&[0b1111])),
1161        )
1162        .await;
1163        assert_eq!(row_ids, Some(rows([1, 3])));
1164
1165        let row_ids = applier_factory(
1166            vec![],
1167            vec![(1, vec![(false, "Hello"), (true, "World")])],
1168            Some(BitVec::from_slice(&[0b1111])),
1169        )
1170        .await;
1171        assert_eq!(row_ids, Some(rows([0, 3])));
1172
1173        let row_ids = applier_factory(
1174            vec![],
1175            vec![(1, vec![(true, "Hello"), (true, "World")])],
1176            Some(BitVec::from_slice(&[0b1111])),
1177        )
1178        .await;
1179        assert_eq!(row_ids, None);
1180    }
1181
1182    #[tokio::test]
1183    async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() {
1184        let applier_factory = build_fulltext_applier_factory(
1185            "test_fulltext_index_multi_terms_case_insensitive_tantivy_",
1186            FulltextBackend::Tantivy,
1187            &[
1188                (None, Some("hello"), None),
1189                (None, None, None),
1190                (None, Some("world"), None),
1191                (None, Some("Hello, World"), None),
1192            ],
1193        )
1194        .await;
1195
1196        let row_ids = applier_factory(
1197            vec![],
1198            vec![(2, vec![(false, "hello"), (false, "world")])],
1199            None,
1200        )
1201        .await;
1202        assert_eq!(row_ids, Some(rows([3])));
1203
1204        let row_ids = applier_factory(
1205            vec![],
1206            vec![(2, vec![(true, "hello"), (false, "world")])],
1207            None,
1208        )
1209        .await;
1210        assert_eq!(row_ids, Some(rows([3])));
1211
1212        let row_ids = applier_factory(
1213            vec![],
1214            vec![(2, vec![(false, "hello"), (true, "world")])],
1215            None,
1216        )
1217        .await;
1218        assert_eq!(row_ids, Some(rows([3])));
1219
1220        let row_ids = applier_factory(
1221            vec![],
1222            vec![(2, vec![(true, "hello"), (true, "world")])],
1223            None,
1224        )
1225        .await;
1226        assert_eq!(row_ids, Some(rows([3])));
1227    }
1228
1229    #[tokio::test]
1230    async fn test_fulltext_index_multi_terms_case_insensitive_bloom() {
1231        let applier_factory = build_fulltext_applier_factory(
1232            "test_fulltext_index_multi_terms_case_insensitive_bloom_",
1233            FulltextBackend::Bloom,
1234            &[
1235                (None, Some("hello"), None),
1236                (None, None, None),
1237                (None, Some("world"), None),
1238                (None, Some("Hello, World"), None),
1239            ],
1240        )
1241        .await;
1242
1243        let row_ids = applier_factory(
1244            vec![],
1245            vec![(2, vec![(false, "hello"), (false, "world")])],
1246            Some(BitVec::from_slice(&[0b1111])),
1247        )
1248        .await;
1249        assert_eq!(row_ids, Some(rows([3])));
1250
1251        let row_ids = applier_factory(
1252            vec![],
1253            vec![(2, vec![(true, "hello"), (false, "world")])],
1254            Some(BitVec::from_slice(&[0b1111])),
1255        )
1256        .await;
1257        assert_eq!(row_ids, Some(rows([3])));
1258
1259        let row_ids = applier_factory(
1260            vec![],
1261            vec![(2, vec![(false, "hello"), (true, "world")])],
1262            Some(BitVec::from_slice(&[0b1111])),
1263        )
1264        .await;
1265        assert_eq!(row_ids, Some(rows([3])));
1266
1267        let row_ids = applier_factory(
1268            vec![],
1269            vec![(2, vec![(true, "hello"), (true, "world")])],
1270            Some(BitVec::from_slice(&[0b1111])),
1271        )
1272        .await;
1273        assert_eq!(row_ids, Some(rows([3])));
1274    }
1275
1276    #[tokio::test]
1277    async fn test_fulltext_index_multi_columns_tantivy() {
1278        let applier_factory = build_fulltext_applier_factory(
1279            "test_fulltext_index_multi_columns_tantivy_",
1280            FulltextBackend::Tantivy,
1281            &[
1282                (Some("Hello"), None, Some("你好")),
1283                (Some("World"), Some("world"), None),
1284                (None, Some("World"), Some("世界")),
1285                (
1286                    Some("Hello, World"),
1287                    Some("Hello, World"),
1288                    Some("你好,世界"),
1289                ),
1290            ],
1291        )
1292        .await;
1293
1294        let row_ids = applier_factory(
1295            vec![(1, "Hello"), (3, "你好")],
1296            vec![(2, vec![(false, "world")])],
1297            None,
1298        )
1299        .await;
1300        assert_eq!(row_ids, Some(rows([3])));
1301
1302        let row_ids =
1303            applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await;
1304        assert_eq!(row_ids, Some(rows([1, 3])));
1305    }
1306
1307    #[tokio::test]
1308    async fn test_fulltext_index_multi_columns_bloom() {
1309        let applier_factory = build_fulltext_applier_factory(
1310            "test_fulltext_index_multi_columns_bloom_",
1311            FulltextBackend::Bloom,
1312            &[
1313                (Some("Hello"), None, Some("你好")),
1314                (Some("World"), Some("world"), None),
1315                (None, Some("World"), Some("世界")),
1316                (
1317                    Some("Hello, World"),
1318                    Some("Hello, World"),
1319                    Some("你好,世界"),
1320                ),
1321            ],
1322        )
1323        .await;
1324
1325        let row_ids = applier_factory(
1326            vec![],
1327            vec![
1328                (1, vec![(false, "Hello")]),
1329                (2, vec![(false, "world")]),
1330                (3, vec![(false, "你好")]),
1331            ],
1332            Some(BitVec::from_slice(&[0b1111])),
1333        )
1334        .await;
1335        assert_eq!(row_ids, Some(rows([3])));
1336
1337        let row_ids = applier_factory(
1338            vec![],
1339            vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])],
1340            Some(BitVec::from_slice(&[0b1111])),
1341        )
1342        .await;
1343        assert_eq!(row_ids, Some(rows([1, 3])));
1344    }
1345}