mito2/sst/index/fulltext_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::warn;
20use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
21use index::fulltext_index::create::{
22    BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
23};
24use index::fulltext_index::{Analyzer, Config};
25use puffin::blob_metadata::CompressionCodec;
26use puffin::puffin_manager::PutOptions;
27use snafu::{ensure, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
30
31use crate::error::{
32    CastVectorSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu, FulltextFinishSnafu,
33    FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
34};
35use crate::read::Batch;
36use crate::sst::file::FileId;
37use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
38use crate::sst::index::intermediate::{
39    IntermediateLocation, IntermediateManager, TempFileProvider,
40};
41use crate::sst::index::puffin_manager::SstPuffinWriter;
42use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
43use crate::sst::index::TYPE_FULLTEXT_INDEX;
44
45/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files.
46pub struct FulltextIndexer {
47    /// Creators for each column.
48    creators: HashMap<ColumnId, SingleCreator>,
49    /// Whether the index creation was aborted.
50    aborted: bool,
51    /// Statistics of index creation.
52    stats: Statistics,
53}
54
55impl FulltextIndexer {
56    /// Creates a new `FulltextIndexer`.
57    pub async fn new(
58        region_id: &RegionId,
59        sst_file_id: &FileId,
60        intermediate_manager: &IntermediateManager,
61        metadata: &RegionMetadataRef,
62        compress: bool,
63        mem_limit: usize,
64    ) -> Result<Option<Self>> {
65        let mut creators = HashMap::new();
66
67        for column in &metadata.column_metadatas {
68            let options = column
69                .column_schema
70                .fulltext_options()
71                .context(IndexOptionsSnafu {
72                    column_name: &column.column_schema.name,
73                })?;
74
75            // Relax the type constraint here as many types can be casted to string.
76
77            let options = match options {
78                Some(options) if options.enable => options,
79                _ => continue,
80            };
81
82            let column_id = column.column_id;
83            let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id);
84
85            let config = Config {
86                analyzer: match options.analyzer {
87                    FulltextAnalyzer::English => Analyzer::English,
88                    FulltextAnalyzer::Chinese => Analyzer::Chinese,
89                },
90                case_sensitive: options.case_sensitive,
91            };
92
93            let inner = match options.backend {
94                FulltextBackend::Tantivy => {
95                    let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
96                        .await
97                        .context(CreateFulltextCreatorSnafu)?;
98                    AltFulltextCreator::Tantivy(creator)
99                }
100                FulltextBackend::Bloom => {
101                    let temp_file_provider = Arc::new(TempFileProvider::new(
102                        IntermediateLocation::new(&metadata.region_id, sst_file_id),
103                        intermediate_manager.clone(),
104                    ));
105                    let global_memory_usage = Arc::new(AtomicUsize::new(0));
106                    let creator = BloomFilterFulltextIndexCreator::new(
107                        config,
108                        options.granularity as _,
109                        options.false_positive_rate(),
110                        temp_file_provider,
111                        global_memory_usage,
112                        Some(mem_limit),
113                    );
114                    AltFulltextCreator::Bloom(creator)
115                }
116            };
117
118            creators.insert(
119                column_id,
120                SingleCreator {
121                    column_id,
122                    inner,
123                    compress,
124                },
125            );
126        }
127
128        Ok((!creators.is_empty()).then(move || Self {
129            creators,
130            aborted: false,
131            stats: Statistics::new(TYPE_FULLTEXT_INDEX),
132        }))
133    }
134
135    /// Updates the index with the given batch.
136    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
137        ensure!(!self.aborted, OperateAbortedIndexSnafu);
138
139        if let Err(update_err) = self.do_update(batch).await {
140            if let Err(err) = self.do_abort().await {
141                if cfg!(any(test, feature = "test")) {
142                    panic!("Failed to abort index creator, err: {err}");
143                } else {
144                    warn!(err; "Failed to abort index creator");
145                }
146            }
147            return Err(update_err);
148        }
149
150        Ok(())
151    }
152
153    /// Finalizes the index creation.
154    pub async fn finish(
155        &mut self,
156        puffin_writer: &mut SstPuffinWriter,
157    ) -> Result<(RowCount, ByteCount)> {
158        ensure!(!self.aborted, OperateAbortedIndexSnafu);
159
160        match self.do_finish(puffin_writer).await {
161            Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())),
162            Err(finish_err) => {
163                if let Err(err) = self.do_abort().await {
164                    if cfg!(any(test, feature = "test")) {
165                        panic!("Failed to abort index creator, err: {err}");
166                    } else {
167                        warn!(err; "Failed to abort index creator");
168                    }
169                }
170                Err(finish_err)
171            }
172        }
173    }
174
175    /// Aborts the index creation.
176    pub async fn abort(&mut self) -> Result<()> {
177        if self.aborted {
178            return Ok(());
179        }
180
181        self.do_abort().await
182    }
183
184    /// Returns the memory usage of the index creator.
185    pub fn memory_usage(&self) -> usize {
186        self.creators.values().map(|c| c.inner.memory_usage()).sum()
187    }
188
189    /// Returns IDs of columns that the creator is responsible for.
190    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
191        self.creators.keys().copied()
192    }
193}
194
195impl FulltextIndexer {
196    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
197        let mut guard = self.stats.record_update();
198        guard.inc_row_count(batch.num_rows());
199
200        for creator in self.creators.values_mut() {
201            creator.update(batch).await?;
202        }
203
204        Ok(())
205    }
206
207    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
208        let mut guard = self.stats.record_finish();
209
210        let mut written_bytes = 0;
211        for creator in self.creators.values_mut() {
212            written_bytes += creator.finish(puffin_writer).await?;
213        }
214
215        guard.inc_byte_count(written_bytes);
216        Ok(())
217    }
218
219    async fn do_abort(&mut self) -> Result<()> {
220        let _guard = self.stats.record_cleanup();
221
222        self.aborted = true;
223
224        for (_, mut creator) in self.creators.drain() {
225            creator.abort().await?;
226        }
227
228        Ok(())
229    }
230}
231
232/// `SingleCreator` is a creator for a single column.
233struct SingleCreator {
234    /// Column ID.
235    column_id: ColumnId,
236    /// Inner creator.
237    inner: AltFulltextCreator,
238    /// Whether the index should be compressed.
239    compress: bool,
240}
241
242impl SingleCreator {
243    async fn update(&mut self, batch: &mut Batch) -> Result<()> {
244        let text_column = batch
245            .fields()
246            .iter()
247            .find(|c| c.column_id == self.column_id);
248        match text_column {
249            Some(column) => {
250                let data = column
251                    .data
252                    .cast(&ConcreteDataType::string_datatype())
253                    .context(CastVectorSnafu {
254                        from: column.data.data_type(),
255                        to: ConcreteDataType::string_datatype(),
256                    })?;
257
258                for i in 0..batch.num_rows() {
259                    let data = data.get_ref(i);
260                    let text = data
261                        .as_string()
262                        .context(DataTypeMismatchSnafu)?
263                        .unwrap_or_default();
264                    self.inner.push_text(text).await?;
265                }
266            }
267            _ => {
268                // If the column is not found in the batch, push empty text.
269                // Ensure that the number of texts pushed is the same as the number of rows in the SST,
270                // so that the texts are aligned with the row ids.
271                for _ in 0..batch.num_rows() {
272                    self.inner.push_text("").await?;
273                }
274            }
275        }
276
277        Ok(())
278    }
279
280    async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
281        let options = PutOptions {
282            compression: self.compress.then_some(CompressionCodec::Zstd),
283        };
284        self.inner
285            .finish(puffin_writer, &self.column_id, options)
286            .await
287    }
288
289    async fn abort(&mut self) -> Result<()> {
290        self.inner.abort(&self.column_id).await;
291        Ok(())
292    }
293}
294
295#[allow(dead_code, clippy::large_enum_variant)]
296/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter.
297enum AltFulltextCreator {
298    Tantivy(TantivyFulltextIndexCreator),
299    Bloom(BloomFilterFulltextIndexCreator),
300}
301
302impl AltFulltextCreator {
303    async fn push_text(&mut self, text: &str) -> Result<()> {
304        match self {
305            Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
306            Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
307        }
308    }
309
310    fn memory_usage(&self) -> usize {
311        match self {
312            Self::Tantivy(creator) => creator.memory_usage(),
313            Self::Bloom(creator) => creator.memory_usage(),
314        }
315    }
316
317    async fn finish(
318        &mut self,
319        puffin_writer: &mut SstPuffinWriter,
320        column_id: &ColumnId,
321        put_options: PutOptions,
322    ) -> Result<ByteCount> {
323        match self {
324            Self::Tantivy(creator) => {
325                let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id);
326                creator
327                    .finish(puffin_writer, &key, put_options)
328                    .await
329                    .context(FulltextFinishSnafu)
330            }
331            Self::Bloom(creator) => {
332                let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id);
333                creator
334                    .finish(puffin_writer, &key, put_options)
335                    .await
336                    .context(FulltextFinishSnafu)
337            }
338        }
339    }
340
341    async fn abort(&mut self, column_id: &ColumnId) {
342        match self {
343            Self::Tantivy(creator) => {
344                if let Err(err) = creator.abort().await {
345                    warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id);
346                }
347            }
348            Self::Bloom(creator) => {
349                if let Err(err) = creator.abort().await {
350                    warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id);
351                }
352            }
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::collections::{BTreeMap, BTreeSet};
360    use std::sync::Arc;
361
362    use api::v1::SemanticType;
363    use common_base::BitVec;
364    use datatypes::data_type::DataType;
365    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
366    use datatypes::vectors::{UInt64Vector, UInt8Vector};
367    use futures::future::BoxFuture;
368    use futures::FutureExt;
369    use index::fulltext_index::search::RowId;
370    use object_store::services::Memory;
371    use object_store::ObjectStore;
372    use puffin::puffin_manager::{PuffinManager, PuffinWriter};
373    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
374    use store_api::region_request::PathType;
375    use store_api::storage::{ConcreteDataType, RegionId};
376
377    use super::*;
378    use crate::access_layer::RegionFilePathFactory;
379    use crate::read::{Batch, BatchColumn};
380    use crate::sst::file::{FileId, RegionFileId};
381    use crate::sst::index::fulltext_index::applier::builder::{
382        FulltextQuery, FulltextRequest, FulltextTerm,
383    };
384    use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
385    use crate::sst::index::puffin_manager::PuffinManagerFactory;
386
387    fn mock_object_store() -> ObjectStore {
388        ObjectStore::new(Memory::default()).unwrap().finish()
389    }
390
391    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
392        IntermediateManager::init_fs(path).await.unwrap()
393    }
394
395    fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef {
396        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
397        builder
398            .push_column_metadata(ColumnMetadata {
399                column_schema: ColumnSchema::new(
400                    "text_english_case_sensitive",
401                    ConcreteDataType::string_datatype(),
402                    true,
403                )
404                .with_fulltext_options(FulltextOptions::new_unchecked(
405                    true,
406                    FulltextAnalyzer::English,
407                    true,
408                    backend.clone(),
409                    1,
410                    0.01,
411                ))
412                .unwrap(),
413                semantic_type: SemanticType::Field,
414                column_id: 1,
415            })
416            .push_column_metadata(ColumnMetadata {
417                column_schema: ColumnSchema::new(
418                    "text_english_case_insensitive",
419                    ConcreteDataType::string_datatype(),
420                    true,
421                )
422                .with_fulltext_options(FulltextOptions::new_unchecked(
423                    true,
424                    FulltextAnalyzer::English,
425                    false,
426                    backend.clone(),
427                    1,
428                    0.01,
429                ))
430                .unwrap(),
431                semantic_type: SemanticType::Field,
432                column_id: 2,
433            })
434            .push_column_metadata(ColumnMetadata {
435                column_schema: ColumnSchema::new(
436                    "text_chinese",
437                    ConcreteDataType::string_datatype(),
438                    true,
439                )
440                .with_fulltext_options(FulltextOptions::new_unchecked(
441                    true,
442                    FulltextAnalyzer::Chinese,
443                    false,
444                    backend.clone(),
445                    1,
446                    0.01,
447                ))
448                .unwrap(),
449                semantic_type: SemanticType::Field,
450                column_id: 3,
451            })
452            .push_column_metadata(ColumnMetadata {
453                column_schema: ColumnSchema::new(
454                    "ts",
455                    ConcreteDataType::timestamp_millisecond_datatype(),
456                    false,
457                ),
458                semantic_type: SemanticType::Timestamp,
459                column_id: 4,
460            });
461
462        Arc::new(builder.build().unwrap())
463    }
464
465    fn new_batch(
466        rows: &[(
467            Option<&str>, // text_english_case_sensitive
468            Option<&str>, // text_english_case_insensitive
469            Option<&str>, // text_chinese
470        )],
471    ) -> Batch {
472        let mut vec_english_sensitive =
473            ConcreteDataType::string_datatype().create_mutable_vector(0);
474        let mut vec_english_insensitive =
475            ConcreteDataType::string_datatype().create_mutable_vector(0);
476        let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0);
477
478        for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
479            match text_english_case_sensitive {
480                Some(s) => vec_english_sensitive.push_value_ref((*s).into()),
481                None => vec_english_sensitive.push_null(),
482            }
483            match text_english_case_insensitive {
484                Some(s) => vec_english_insensitive.push_value_ref((*s).into()),
485                None => vec_english_insensitive.push_null(),
486            }
487            match text_chinese {
488                Some(s) => vec_chinese.push_value_ref((*s).into()),
489                None => vec_chinese.push_null(),
490            }
491        }
492
493        let num_rows = vec_english_sensitive.len();
494        Batch::new(
495            vec![],
496            Arc::new(UInt64Vector::from_iter_values(
497                (0..num_rows).map(|n| n as u64),
498            )),
499            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
500                0, num_rows,
501            ))),
502            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
503                1, num_rows,
504            ))),
505            vec![
506                BatchColumn {
507                    column_id: 1,
508                    data: vec_english_sensitive.to_vector(),
509                },
510                BatchColumn {
511                    column_id: 2,
512                    data: vec_english_insensitive.to_vector(),
513                },
514                BatchColumn {
515                    column_id: 3,
516                    data: vec_chinese.to_vector(),
517                },
518            ],
519        )
520        .unwrap()
521    }
522
523    /// Applier factory that can handle both queries and terms.
524    ///
525    /// It builds a fulltext index with the given data rows, and returns a function
526    /// that can handle both queries and terms in a single request.
527    ///
528    /// The function takes two parameters:
529    /// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries
530    /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
531    async fn build_fulltext_applier_factory(
532        prefix: &str,
533        backend: FulltextBackend,
534        rows: &[(
535            Option<&str>, // text_english_case_sensitive
536            Option<&str>, // text_english_case_insensitive
537            Option<&str>, // text_chinese
538        )],
539    ) -> impl Fn(
540        Vec<(ColumnId, &str)>,
541        Vec<(ColumnId, Vec<(bool, &str)>)>,
542        Option<BitVec>,
543    ) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
544        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
545        let table_dir = "table0".to_string();
546        let sst_file_id = FileId::random();
547        let object_store = mock_object_store();
548        let region_metadata = mock_region_metadata(backend.clone());
549        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
550
551        let mut indexer = FulltextIndexer::new(
552            &region_metadata.region_id,
553            &sst_file_id,
554            &intm_mgr,
555            &region_metadata,
556            true,
557            1024,
558        )
559        .await
560        .unwrap()
561        .unwrap();
562
563        let mut batch = new_batch(rows);
564        indexer.update(&mut batch).await.unwrap();
565
566        let puffin_manager = factory.build(
567            object_store.clone(),
568            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
569        );
570        let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
571        let mut writer = puffin_manager.writer(&region_file_id).await.unwrap();
572        let _ = indexer.finish(&mut writer).await.unwrap();
573        writer.finish().await.unwrap();
574
575        move |queries: Vec<(ColumnId, &str)>,
576              terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>,
577              coarse_mask: Option<BitVec>| {
578            let _d = &d;
579            let table_dir = table_dir.clone();
580            let object_store = object_store.clone();
581            let factory = factory.clone();
582
583            let mut requests: BTreeMap<ColumnId, FulltextRequest> = BTreeMap::new();
584
585            // Add queries
586            for (column_id, query) in queries {
587                requests
588                    .entry(column_id)
589                    .or_default()
590                    .queries
591                    .push(FulltextQuery(query.to_string()));
592            }
593
594            // Add terms
595            for (column_id, terms) in terms_requests {
596                let fulltext_terms = terms
597                    .into_iter()
598                    .map(|(col_lowered, term)| FulltextTerm {
599                        col_lowered,
600                        term: term.to_string(),
601                    })
602                    .collect::<Vec<_>>();
603
604                requests
605                    .entry(column_id)
606                    .or_default()
607                    .terms
608                    .extend(fulltext_terms);
609            }
610
611            let applier = FulltextIndexApplier::new(
612                table_dir,
613                PathType::Bare,
614                object_store,
615                requests,
616                factory,
617            );
618
619            let backend = backend.clone();
620            async move {
621                match backend {
622                    FulltextBackend::Tantivy => {
623                        applier.apply_fine(region_file_id, None).await.unwrap()
624                    }
625                    FulltextBackend::Bloom => {
626                        let coarse_mask = coarse_mask.unwrap_or_default();
627                        let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
628                        // row group id == row id
629                        let resp = applier
630                            .apply_coarse(region_file_id, None, row_groups)
631                            .await
632                            .unwrap();
633                        resp.map(|r| {
634                            r.into_iter()
635                                .filter(|(_, ranges)| !ranges.is_empty())
636                                .map(|(row_group_id, _)| row_group_id as RowId)
637                                .collect()
638                        })
639                    }
640                }
641            }
642            .boxed()
643        }
644    }
645
646    fn rows(row_ids: impl IntoIterator<Item = RowId>) -> BTreeSet<RowId> {
647        row_ids.into_iter().collect()
648    }
649
650    #[tokio::test]
651    async fn test_fulltext_index_basic_case_sensitive_tantivy() {
652        let applier_factory = build_fulltext_applier_factory(
653            "test_fulltext_index_basic_case_sensitive_tantivy_",
654            FulltextBackend::Tantivy,
655            &[
656                (Some("hello"), None, None),
657                (Some("world"), None, None),
658                (None, None, None),
659                (Some("Hello, World"), None, None),
660            ],
661        )
662        .await;
663
664        let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await;
665        assert_eq!(row_ids, Some(rows([0])));
666
667        let row_ids = applier_factory(vec![(1, "world")], vec![], None).await;
668        assert_eq!(row_ids, Some(rows([1])));
669
670        let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await;
671        assert_eq!(row_ids, Some(rows([3])));
672
673        let row_ids = applier_factory(vec![(1, "World")], vec![], None).await;
674        assert_eq!(row_ids, Some(rows([3])));
675
676        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await;
677        assert_eq!(row_ids, Some(rows([0])));
678
679        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await;
680        assert_eq!(row_ids, None);
681
682        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await;
683        assert_eq!(row_ids, Some(rows([1])));
684
685        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await;
686        assert_eq!(row_ids, None);
687
688        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await;
689        assert_eq!(row_ids, Some(rows([3])));
690
691        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await;
692        assert_eq!(row_ids, None);
693
694        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await;
695        assert_eq!(row_ids, Some(rows([3])));
696
697        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await;
698        assert_eq!(row_ids, None);
699    }
700
701    #[tokio::test]
702    async fn test_fulltext_index_basic_case_sensitive_bloom() {
703        let applier_factory = build_fulltext_applier_factory(
704            "test_fulltext_index_basic_case_sensitive_bloom_",
705            FulltextBackend::Bloom,
706            &[
707                (Some("hello"), None, None),
708                (Some("world"), None, None),
709                (None, None, None),
710                (Some("Hello, World"), None, None),
711            ],
712        )
713        .await;
714
715        let row_ids = applier_factory(
716            vec![],
717            vec![(1, vec![(false, "hello")])],
718            Some(BitVec::from_slice(&[0b1111])),
719        )
720        .await;
721        assert_eq!(row_ids, Some(rows([0])));
722
723        let row_ids = applier_factory(
724            vec![],
725            vec![(1, vec![(false, "hello")])],
726            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
727        )
728        .await;
729        assert_eq!(row_ids, Some(rows([])));
730
731        let row_ids = applier_factory(
732            vec![],
733            vec![(1, vec![(true, "hello")])],
734            Some(BitVec::from_slice(&[0b1111])),
735        )
736        .await;
737        assert_eq!(row_ids, None);
738
739        let row_ids = applier_factory(
740            vec![],
741            vec![(1, vec![(false, "world")])],
742            Some(BitVec::from_slice(&[0b1111])),
743        )
744        .await;
745        assert_eq!(row_ids, Some(rows([1])));
746
747        let row_ids = applier_factory(
748            vec![],
749            vec![(1, vec![(false, "world")])],
750            Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out
751        )
752        .await;
753        assert_eq!(row_ids, Some(rows([])));
754
755        let row_ids = applier_factory(
756            vec![],
757            vec![(1, vec![(true, "world")])],
758            Some(BitVec::from_slice(&[0b1111])),
759        )
760        .await;
761        assert_eq!(row_ids, None);
762
763        let row_ids = applier_factory(
764            vec![],
765            vec![(1, vec![(false, "Hello")])],
766            Some(BitVec::from_slice(&[0b1111])),
767        )
768        .await;
769        assert_eq!(row_ids, Some(rows([3])));
770
771        let row_ids = applier_factory(
772            vec![],
773            vec![(1, vec![(false, "Hello")])],
774            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
775        )
776        .await;
777        assert_eq!(row_ids, Some(rows([])));
778
779        let row_ids = applier_factory(
780            vec![],
781            vec![(1, vec![(true, "Hello")])],
782            Some(BitVec::from_slice(&[0b1111])),
783        )
784        .await;
785        assert_eq!(row_ids, None);
786
787        let row_ids = applier_factory(
788            vec![],
789            vec![(1, vec![(false, "Hello, World")])],
790            Some(BitVec::from_slice(&[0b1111])),
791        )
792        .await;
793        assert_eq!(row_ids, Some(rows([3])));
794
795        let row_ids = applier_factory(
796            vec![],
797            vec![(1, vec![(false, "Hello, World")])],
798            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
799        )
800        .await;
801        assert_eq!(row_ids, Some(rows([])));
802
803        let row_ids = applier_factory(
804            vec![],
805            vec![(1, vec![(true, "Hello, World")])],
806            Some(BitVec::from_slice(&[0b1111])),
807        )
808        .await;
809        assert_eq!(row_ids, None);
810    }
811
812    #[tokio::test]
813    async fn test_fulltext_index_basic_case_insensitive_tantivy() {
814        let applier_factory = build_fulltext_applier_factory(
815            "test_fulltext_index_basic_case_insensitive_tantivy_",
816            FulltextBackend::Tantivy,
817            &[
818                (None, Some("hello"), None),
819                (None, None, None),
820                (None, Some("world"), None),
821                (None, Some("Hello, World"), None),
822            ],
823        )
824        .await;
825
826        let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await;
827        assert_eq!(row_ids, Some(rows([0, 3])));
828
829        let row_ids = applier_factory(vec![(2, "world")], vec![], None).await;
830        assert_eq!(row_ids, Some(rows([2, 3])));
831
832        let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await;
833        assert_eq!(row_ids, Some(rows([0, 3])));
834
835        let row_ids = applier_factory(vec![(2, "World")], vec![], None).await;
836        assert_eq!(row_ids, Some(rows([2, 3])));
837
838        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await;
839        assert_eq!(row_ids, Some(rows([0, 3])));
840
841        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await;
842        assert_eq!(row_ids, Some(rows([0, 3])));
843
844        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await;
845        assert_eq!(row_ids, Some(rows([2, 3])));
846
847        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await;
848        assert_eq!(row_ids, Some(rows([2, 3])));
849
850        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await;
851        assert_eq!(row_ids, Some(rows([0, 3])));
852
853        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await;
854        assert_eq!(row_ids, Some(rows([0, 3])));
855
856        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await;
857        assert_eq!(row_ids, Some(rows([2, 3])));
858
859        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await;
860        assert_eq!(row_ids, Some(rows([2, 3])));
861    }
862
863    #[tokio::test]
864    async fn test_fulltext_index_basic_case_insensitive_bloom() {
865        let applier_factory = build_fulltext_applier_factory(
866            "test_fulltext_index_basic_case_insensitive_bloom_",
867            FulltextBackend::Bloom,
868            &[
869                (None, Some("hello"), None),
870                (None, None, None),
871                (None, Some("world"), None),
872                (None, Some("Hello, World"), None),
873            ],
874        )
875        .await;
876
877        let row_ids = applier_factory(
878            vec![],
879            vec![(2, vec![(false, "hello")])],
880            Some(BitVec::from_slice(&[0b1111])),
881        )
882        .await;
883        assert_eq!(row_ids, Some(rows([0, 3])));
884
885        let row_ids = applier_factory(
886            vec![],
887            vec![(2, vec![(false, "hello")])],
888            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
889        )
890        .await;
891        assert_eq!(row_ids, Some(rows([3])));
892
893        let row_ids = applier_factory(
894            vec![],
895            vec![(2, vec![(true, "hello")])],
896            Some(BitVec::from_slice(&[0b1111])),
897        )
898        .await;
899        assert_eq!(row_ids, Some(rows([0, 3])));
900
901        let row_ids = applier_factory(
902            vec![],
903            vec![(2, vec![(true, "hello")])],
904            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
905        )
906        .await;
907        assert_eq!(row_ids, Some(rows([3])));
908
909        let row_ids = applier_factory(
910            vec![],
911            vec![(2, vec![(false, "world")])],
912            Some(BitVec::from_slice(&[0b1111])),
913        )
914        .await;
915        assert_eq!(row_ids, Some(rows([2, 3])));
916
917        let row_ids = applier_factory(
918            vec![],
919            vec![(2, vec![(false, "world")])],
920            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
921        )
922        .await;
923        assert_eq!(row_ids, Some(rows([3])));
924
925        let row_ids = applier_factory(
926            vec![],
927            vec![(2, vec![(true, "world")])],
928            Some(BitVec::from_slice(&[0b1111])),
929        )
930        .await;
931        assert_eq!(row_ids, Some(rows([2, 3])));
932
933        let row_ids = applier_factory(
934            vec![],
935            vec![(2, vec![(true, "world")])],
936            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
937        )
938        .await;
939        assert_eq!(row_ids, Some(rows([3])));
940
941        let row_ids = applier_factory(
942            vec![],
943            vec![(2, vec![(false, "Hello")])],
944            Some(BitVec::from_slice(&[0b1111])),
945        )
946        .await;
947        assert_eq!(row_ids, Some(rows([0, 3])));
948
949        let row_ids = applier_factory(
950            vec![],
951            vec![(2, vec![(false, "Hello")])],
952            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
953        )
954        .await;
955        assert_eq!(row_ids, Some(rows([0])));
956
957        let row_ids = applier_factory(
958            vec![],
959            vec![(2, vec![(true, "Hello")])],
960            Some(BitVec::from_slice(&[0b1111])),
961        )
962        .await;
963        assert_eq!(row_ids, Some(rows([0, 3])));
964
965        let row_ids = applier_factory(
966            vec![],
967            vec![(2, vec![(true, "Hello")])],
968            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
969        )
970        .await;
971        assert_eq!(row_ids, Some(rows([3])));
972
973        let row_ids = applier_factory(
974            vec![],
975            vec![(2, vec![(false, "World")])],
976            Some(BitVec::from_slice(&[0b1111])),
977        )
978        .await;
979        assert_eq!(row_ids, Some(rows([2, 3])));
980
981        let row_ids = applier_factory(
982            vec![],
983            vec![(2, vec![(false, "World")])],
984            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
985        )
986        .await;
987        assert_eq!(row_ids, Some(rows([2])));
988
989        let row_ids = applier_factory(
990            vec![],
991            vec![(2, vec![(true, "World")])],
992            Some(BitVec::from_slice(&[0b1111])),
993        )
994        .await;
995        assert_eq!(row_ids, Some(rows([2, 3])));
996
997        let row_ids = applier_factory(
998            vec![],
999            vec![(2, vec![(true, "World")])],
1000            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1001        )
1002        .await;
1003        assert_eq!(row_ids, Some(rows([3])));
1004    }
1005
1006    #[tokio::test]
1007    async fn test_fulltext_index_basic_chinese_tantivy() {
1008        let applier_factory = build_fulltext_applier_factory(
1009            "test_fulltext_index_basic_chinese_tantivy_",
1010            FulltextBackend::Tantivy,
1011            &[
1012                (None, None, Some("你好")),
1013                (None, None, None),
1014                (None, None, Some("世界")),
1015                (None, None, Some("你好,世界")),
1016            ],
1017        )
1018        .await;
1019
1020        let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await;
1021        assert_eq!(row_ids, Some(rows([0, 3])));
1022
1023        let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await;
1024        assert_eq!(row_ids, Some(rows([2, 3])));
1025
1026        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await;
1027        assert_eq!(row_ids, Some(rows([0, 3])));
1028
1029        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await;
1030        assert_eq!(row_ids, Some(rows([2, 3])));
1031    }
1032
1033    #[tokio::test]
1034    async fn test_fulltext_index_basic_chinese_bloom() {
1035        let applier_factory = build_fulltext_applier_factory(
1036            "test_fulltext_index_basic_chinese_bloom_",
1037            FulltextBackend::Bloom,
1038            &[
1039                (None, None, Some("你好")),
1040                (None, None, None),
1041                (None, None, Some("世界")),
1042                (None, None, Some("你好,世界")),
1043            ],
1044        )
1045        .await;
1046
1047        let row_ids = applier_factory(
1048            vec![],
1049            vec![(3, vec![(false, "你好")])],
1050            Some(BitVec::from_slice(&[0b1111])),
1051        )
1052        .await;
1053        assert_eq!(row_ids, Some(rows([0, 3])));
1054
1055        let row_ids = applier_factory(
1056            vec![],
1057            vec![(3, vec![(false, "你好")])],
1058            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1059        )
1060        .await;
1061        assert_eq!(row_ids, Some(rows([3])));
1062
1063        let row_ids = applier_factory(
1064            vec![],
1065            vec![(3, vec![(false, "世界")])],
1066            Some(BitVec::from_slice(&[0b1111])),
1067        )
1068        .await;
1069        assert_eq!(row_ids, Some(rows([2, 3])));
1070
1071        let row_ids = applier_factory(
1072            vec![],
1073            vec![(3, vec![(false, "世界")])],
1074            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1075        )
1076        .await;
1077        assert_eq!(row_ids, Some(rows([3])));
1078    }
1079
1080    #[tokio::test]
1081    async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() {
1082        let applier_factory = build_fulltext_applier_factory(
1083            "test_fulltext_index_multi_terms_case_sensitive_tantivy_",
1084            FulltextBackend::Tantivy,
1085            &[
1086                (Some("Hello"), None, None),
1087                (Some("World"), None, None),
1088                (None, None, None),
1089                (Some("Hello, World"), None, None),
1090            ],
1091        )
1092        .await;
1093
1094        let row_ids = applier_factory(
1095            vec![],
1096            vec![(1, vec![(false, "hello"), (false, "world")])],
1097            None,
1098        )
1099        .await;
1100        assert_eq!(row_ids, Some(rows([])));
1101
1102        let row_ids = applier_factory(
1103            vec![],
1104            vec![(1, vec![(false, "Hello"), (false, "World")])],
1105            None,
1106        )
1107        .await;
1108        assert_eq!(row_ids, Some(rows([3])));
1109
1110        let row_ids = applier_factory(
1111            vec![],
1112            vec![(1, vec![(true, "Hello"), (false, "World")])],
1113            None,
1114        )
1115        .await;
1116        assert_eq!(row_ids, Some(rows([1, 3])));
1117
1118        let row_ids = applier_factory(
1119            vec![],
1120            vec![(1, vec![(false, "Hello"), (true, "World")])],
1121            None,
1122        )
1123        .await;
1124        assert_eq!(row_ids, Some(rows([0, 3])));
1125
1126        let row_ids = applier_factory(
1127            vec![],
1128            vec![(1, vec![(true, "Hello"), (true, "World")])],
1129            None,
1130        )
1131        .await;
1132        assert_eq!(row_ids, None);
1133    }
1134
1135    #[tokio::test]
1136    async fn test_fulltext_index_multi_terms_case_sensitive_bloom() {
1137        let applier_factory = build_fulltext_applier_factory(
1138            "test_fulltext_index_multi_terms_case_sensitive_bloom_",
1139            FulltextBackend::Bloom,
1140            &[
1141                (Some("Hello"), None, None),
1142                (Some("World"), None, None),
1143                (None, None, None),
1144                (Some("Hello, World"), None, None),
1145            ],
1146        )
1147        .await;
1148
1149        let row_ids = applier_factory(
1150            vec![],
1151            vec![(1, vec![(false, "hello"), (false, "world")])],
1152            Some(BitVec::from_slice(&[0b1111])),
1153        )
1154        .await;
1155        assert_eq!(row_ids, Some(rows([])));
1156
1157        let row_ids = applier_factory(
1158            vec![],
1159            vec![(1, vec![(false, "Hello"), (false, "World")])],
1160            Some(BitVec::from_slice(&[0b1111])),
1161        )
1162        .await;
1163        assert_eq!(row_ids, Some(rows([3])));
1164
1165        let row_ids = applier_factory(
1166            vec![],
1167            vec![(1, vec![(true, "Hello"), (false, "World")])],
1168            Some(BitVec::from_slice(&[0b1111])),
1169        )
1170        .await;
1171        assert_eq!(row_ids, Some(rows([1, 3])));
1172
1173        let row_ids = applier_factory(
1174            vec![],
1175            vec![(1, vec![(false, "Hello"), (true, "World")])],
1176            Some(BitVec::from_slice(&[0b1111])),
1177        )
1178        .await;
1179        assert_eq!(row_ids, Some(rows([0, 3])));
1180
1181        let row_ids = applier_factory(
1182            vec![],
1183            vec![(1, vec![(true, "Hello"), (true, "World")])],
1184            Some(BitVec::from_slice(&[0b1111])),
1185        )
1186        .await;
1187        assert_eq!(row_ids, None);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() {
1192        let applier_factory = build_fulltext_applier_factory(
1193            "test_fulltext_index_multi_terms_case_insensitive_tantivy_",
1194            FulltextBackend::Tantivy,
1195            &[
1196                (None, Some("hello"), None),
1197                (None, None, None),
1198                (None, Some("world"), None),
1199                (None, Some("Hello, World"), None),
1200            ],
1201        )
1202        .await;
1203
1204        let row_ids = applier_factory(
1205            vec![],
1206            vec![(2, vec![(false, "hello"), (false, "world")])],
1207            None,
1208        )
1209        .await;
1210        assert_eq!(row_ids, Some(rows([3])));
1211
1212        let row_ids = applier_factory(
1213            vec![],
1214            vec![(2, vec![(true, "hello"), (false, "world")])],
1215            None,
1216        )
1217        .await;
1218        assert_eq!(row_ids, Some(rows([3])));
1219
1220        let row_ids = applier_factory(
1221            vec![],
1222            vec![(2, vec![(false, "hello"), (true, "world")])],
1223            None,
1224        )
1225        .await;
1226        assert_eq!(row_ids, Some(rows([3])));
1227
1228        let row_ids = applier_factory(
1229            vec![],
1230            vec![(2, vec![(true, "hello"), (true, "world")])],
1231            None,
1232        )
1233        .await;
1234        assert_eq!(row_ids, Some(rows([3])));
1235    }
1236
1237    #[tokio::test]
1238    async fn test_fulltext_index_multi_terms_case_insensitive_bloom() {
1239        let applier_factory = build_fulltext_applier_factory(
1240            "test_fulltext_index_multi_terms_case_insensitive_bloom_",
1241            FulltextBackend::Bloom,
1242            &[
1243                (None, Some("hello"), None),
1244                (None, None, None),
1245                (None, Some("world"), None),
1246                (None, Some("Hello, World"), None),
1247            ],
1248        )
1249        .await;
1250
1251        let row_ids = applier_factory(
1252            vec![],
1253            vec![(2, vec![(false, "hello"), (false, "world")])],
1254            Some(BitVec::from_slice(&[0b1111])),
1255        )
1256        .await;
1257        assert_eq!(row_ids, Some(rows([3])));
1258
1259        let row_ids = applier_factory(
1260            vec![],
1261            vec![(2, vec![(true, "hello"), (false, "world")])],
1262            Some(BitVec::from_slice(&[0b1111])),
1263        )
1264        .await;
1265        assert_eq!(row_ids, Some(rows([3])));
1266
1267        let row_ids = applier_factory(
1268            vec![],
1269            vec![(2, vec![(false, "hello"), (true, "world")])],
1270            Some(BitVec::from_slice(&[0b1111])),
1271        )
1272        .await;
1273        assert_eq!(row_ids, Some(rows([3])));
1274
1275        let row_ids = applier_factory(
1276            vec![],
1277            vec![(2, vec![(true, "hello"), (true, "world")])],
1278            Some(BitVec::from_slice(&[0b1111])),
1279        )
1280        .await;
1281        assert_eq!(row_ids, Some(rows([3])));
1282    }
1283
1284    #[tokio::test]
1285    async fn test_fulltext_index_multi_columns_tantivy() {
1286        let applier_factory = build_fulltext_applier_factory(
1287            "test_fulltext_index_multi_columns_tantivy_",
1288            FulltextBackend::Tantivy,
1289            &[
1290                (Some("Hello"), None, Some("你好")),
1291                (Some("World"), Some("world"), None),
1292                (None, Some("World"), Some("世界")),
1293                (
1294                    Some("Hello, World"),
1295                    Some("Hello, World"),
1296                    Some("你好,世界"),
1297                ),
1298            ],
1299        )
1300        .await;
1301
1302        let row_ids = applier_factory(
1303            vec![(1, "Hello"), (3, "你好")],
1304            vec![(2, vec![(false, "world")])],
1305            None,
1306        )
1307        .await;
1308        assert_eq!(row_ids, Some(rows([3])));
1309
1310        let row_ids =
1311            applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await;
1312        assert_eq!(row_ids, Some(rows([1, 3])));
1313    }
1314
1315    #[tokio::test]
1316    async fn test_fulltext_index_multi_columns_bloom() {
1317        let applier_factory = build_fulltext_applier_factory(
1318            "test_fulltext_index_multi_columns_bloom_",
1319            FulltextBackend::Bloom,
1320            &[
1321                (Some("Hello"), None, Some("你好")),
1322                (Some("World"), Some("world"), None),
1323                (None, Some("World"), Some("世界")),
1324                (
1325                    Some("Hello, World"),
1326                    Some("Hello, World"),
1327                    Some("你好,世界"),
1328                ),
1329            ],
1330        )
1331        .await;
1332
1333        let row_ids = applier_factory(
1334            vec![],
1335            vec![
1336                (1, vec![(false, "Hello")]),
1337                (2, vec![(false, "world")]),
1338                (3, vec![(false, "你好")]),
1339            ],
1340            Some(BitVec::from_slice(&[0b1111])),
1341        )
1342        .await;
1343        assert_eq!(row_ids, Some(rows([3])));
1344
1345        let row_ids = applier_factory(
1346            vec![],
1347            vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])],
1348            Some(BitVec::from_slice(&[0b1111])),
1349        )
1350        .await;
1351        assert_eq!(row_ids, Some(rows([1, 3])));
1352    }
1353}