mito2/sst/index/fulltext_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::AtomicUsize;
18
19use common_telemetry::warn;
20use datatypes::arrow::array::{Array, StringArray};
21use datatypes::arrow::datatypes::DataType;
22use datatypes::arrow::record_batch::RecordBatch;
23use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
24use index::fulltext_index::create::{
25    BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
26};
27use index::fulltext_index::{Analyzer, Config};
28use puffin::blob_metadata::CompressionCodec;
29use puffin::puffin_manager::PutOptions;
30use snafu::{ResultExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, ConcreteDataType, FileId, RegionId};
33
34use crate::error::{
35    CastVectorSnafu, ComputeArrowSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu,
36    FulltextFinishSnafu, FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu,
37    Result,
38};
39use crate::read::Batch;
40use crate::sst::index::TYPE_FULLTEXT_INDEX;
41use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
42use crate::sst::index::intermediate::{
43    IntermediateLocation, IntermediateManager, TempFileProvider,
44};
45use crate::sst::index::puffin_manager::SstPuffinWriter;
46use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
47
48/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files.
49pub struct FulltextIndexer {
50    /// Creators for each column.
51    creators: HashMap<ColumnId, SingleCreator>,
52    /// Whether the index creation was aborted.
53    aborted: bool,
54    /// Statistics of index creation.
55    stats: Statistics,
56}
57
58impl FulltextIndexer {
59    /// Creates a new `FulltextIndexer`.
60    pub async fn new(
61        region_id: &RegionId,
62        sst_file_id: &FileId,
63        intermediate_manager: &IntermediateManager,
64        metadata: &RegionMetadataRef,
65        compress: bool,
66        mem_limit: usize,
67    ) -> Result<Option<Self>> {
68        let mut creators = HashMap::new();
69
70        for column in &metadata.column_metadatas {
71            let options = column
72                .column_schema
73                .fulltext_options()
74                .context(IndexOptionsSnafu {
75                    column_name: &column.column_schema.name,
76                })?;
77
78            // Relax the type constraint here as many types can be casted to string.
79
80            let options = match options {
81                Some(options) if options.enable => options,
82                _ => continue,
83            };
84
85            let column_id = column.column_id;
86            let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id);
87
88            let config = Config {
89                analyzer: match options.analyzer {
90                    FulltextAnalyzer::English => Analyzer::English,
91                    FulltextAnalyzer::Chinese => Analyzer::Chinese,
92                },
93                case_sensitive: options.case_sensitive,
94            };
95
96            let inner = match options.backend {
97                FulltextBackend::Tantivy => {
98                    let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
99                        .await
100                        .context(CreateFulltextCreatorSnafu)?;
101                    AltFulltextCreator::Tantivy(creator)
102                }
103                FulltextBackend::Bloom => {
104                    let temp_file_provider = Arc::new(TempFileProvider::new(
105                        IntermediateLocation::new(&metadata.region_id, sst_file_id),
106                        intermediate_manager.clone(),
107                    ));
108                    let global_memory_usage = Arc::new(AtomicUsize::new(0));
109                    let creator = BloomFilterFulltextIndexCreator::new(
110                        config,
111                        options.granularity as _,
112                        options.false_positive_rate(),
113                        temp_file_provider,
114                        global_memory_usage,
115                        Some(mem_limit),
116                    );
117                    AltFulltextCreator::Bloom(creator)
118                }
119            };
120
121            creators.insert(
122                column_id,
123                SingleCreator {
124                    column_id,
125                    column_name: column.column_schema.name.clone(),
126                    inner,
127                    compress,
128                },
129            );
130        }
131
132        Ok((!creators.is_empty()).then(move || Self {
133            creators,
134            aborted: false,
135            stats: Statistics::new(TYPE_FULLTEXT_INDEX),
136        }))
137    }
138
139    /// Updates the index with the given batch.
140    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
141        ensure!(!self.aborted, OperateAbortedIndexSnafu);
142
143        if let Err(update_err) = self.do_update(batch).await {
144            if let Err(err) = self.do_abort().await {
145                if cfg!(any(test, feature = "test")) {
146                    panic!("Failed to abort index creator, err: {err}");
147                } else {
148                    warn!(err; "Failed to abort index creator");
149                }
150            }
151            return Err(update_err);
152        }
153
154        Ok(())
155    }
156
157    /// Updates the fulltext index with the given flat format RecordBatch.
158    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
159        ensure!(!self.aborted, OperateAbortedIndexSnafu);
160
161        if batch.num_rows() == 0 {
162            return Ok(());
163        }
164
165        if let Err(update_err) = self.do_update_flat(batch).await {
166            if let Err(err) = self.do_abort().await {
167                if cfg!(any(test, feature = "test")) {
168                    panic!("Failed to abort index creator, err: {err}");
169                } else {
170                    warn!(err; "Failed to abort index creator");
171                }
172            }
173            return Err(update_err);
174        }
175
176        Ok(())
177    }
178
179    /// Finalizes the index creation.
180    pub async fn finish(
181        &mut self,
182        puffin_writer: &mut SstPuffinWriter,
183    ) -> Result<(RowCount, ByteCount)> {
184        ensure!(!self.aborted, OperateAbortedIndexSnafu);
185
186        match self.do_finish(puffin_writer).await {
187            Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())),
188            Err(finish_err) => {
189                if let Err(err) = self.do_abort().await {
190                    if cfg!(any(test, feature = "test")) {
191                        panic!("Failed to abort index creator, err: {err}");
192                    } else {
193                        warn!(err; "Failed to abort index creator");
194                    }
195                }
196                Err(finish_err)
197            }
198        }
199    }
200
201    /// Aborts the index creation.
202    pub async fn abort(&mut self) -> Result<()> {
203        if self.aborted {
204            return Ok(());
205        }
206
207        self.do_abort().await
208    }
209
210    /// Returns the memory usage of the index creator.
211    pub fn memory_usage(&self) -> usize {
212        self.creators.values().map(|c| c.inner.memory_usage()).sum()
213    }
214
215    /// Returns IDs of columns that the creator is responsible for.
216    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
217        self.creators.keys().copied()
218    }
219}
220
221impl FulltextIndexer {
222    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
223        let mut guard = self.stats.record_update();
224        guard.inc_row_count(batch.num_rows());
225
226        for creator in self.creators.values_mut() {
227            creator.update(batch).await?;
228        }
229
230        Ok(())
231    }
232
233    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
234        let mut guard = self.stats.record_update();
235        guard.inc_row_count(batch.num_rows());
236
237        for creator in self.creators.values_mut() {
238            creator.update_flat(batch).await?;
239        }
240
241        Ok(())
242    }
243
244    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
245        let mut guard = self.stats.record_finish();
246
247        let mut written_bytes = 0;
248        for creator in self.creators.values_mut() {
249            written_bytes += creator.finish(puffin_writer).await?;
250        }
251
252        guard.inc_byte_count(written_bytes);
253        Ok(())
254    }
255
256    async fn do_abort(&mut self) -> Result<()> {
257        let _guard = self.stats.record_cleanup();
258
259        self.aborted = true;
260
261        for (_, mut creator) in self.creators.drain() {
262            creator.abort().await?;
263        }
264
265        Ok(())
266    }
267}
268
269/// `SingleCreator` is a creator for a single column.
270struct SingleCreator {
271    /// Column ID.
272    column_id: ColumnId,
273    /// Column name.
274    column_name: String,
275    /// Inner creator.
276    inner: AltFulltextCreator,
277    /// Whether the index should be compressed.
278    compress: bool,
279}
280
281impl SingleCreator {
282    async fn update(&mut self, batch: &mut Batch) -> Result<()> {
283        let text_column = batch
284            .fields()
285            .iter()
286            .find(|c| c.column_id == self.column_id);
287        match text_column {
288            Some(column) => {
289                let data = column
290                    .data
291                    .cast(&ConcreteDataType::string_datatype())
292                    .context(CastVectorSnafu {
293                        from: column.data.data_type(),
294                        to: ConcreteDataType::string_datatype(),
295                    })?;
296
297                for i in 0..batch.num_rows() {
298                    let data = data.get_ref(i);
299                    let text = data
300                        .as_string()
301                        .context(DataTypeMismatchSnafu)?
302                        .unwrap_or_default();
303                    self.inner.push_text(text).await?;
304                }
305            }
306            _ => {
307                // If the column is not found in the batch, push empty text.
308                // Ensure that the number of texts pushed is the same as the number of rows in the SST,
309                // so that the texts are aligned with the row ids.
310                for _ in 0..batch.num_rows() {
311                    self.inner.push_text("").await?;
312                }
313            }
314        }
315
316        Ok(())
317    }
318
319    async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
320        // Find the column in the RecordBatch by name
321        if let Some(column_array) = batch.column_by_name(&self.column_name) {
322            // Convert Arrow array to string array.
323            // TODO(yingwen): Use Utf8View later if possible.
324            let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8)
325                .context(ComputeArrowSnafu)?;
326            let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
327            for text_opt in string_array.iter() {
328                let text = text_opt.unwrap_or_default();
329                self.inner.push_text(text).await?;
330            }
331        } else {
332            // If the column is not found in the batch, push empty text.
333            // Ensure that the number of texts pushed is the same as the number of rows in the SST,
334            // so that the texts are aligned with the row ids.
335            for _ in 0..batch.num_rows() {
336                self.inner.push_text("").await?;
337            }
338        }
339
340        Ok(())
341    }
342
343    async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
344        let options = PutOptions {
345            compression: self.compress.then_some(CompressionCodec::Zstd),
346        };
347        self.inner
348            .finish(puffin_writer, &self.column_id, options)
349            .await
350    }
351
352    async fn abort(&mut self) -> Result<()> {
353        self.inner.abort(&self.column_id).await;
354        Ok(())
355    }
356}
357
358#[allow(dead_code, clippy::large_enum_variant)]
359/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter.
360enum AltFulltextCreator {
361    Tantivy(TantivyFulltextIndexCreator),
362    Bloom(BloomFilterFulltextIndexCreator),
363}
364
365impl AltFulltextCreator {
366    async fn push_text(&mut self, text: &str) -> Result<()> {
367        match self {
368            Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
369            Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
370        }
371    }
372
373    fn memory_usage(&self) -> usize {
374        match self {
375            Self::Tantivy(creator) => creator.memory_usage(),
376            Self::Bloom(creator) => creator.memory_usage(),
377        }
378    }
379
380    async fn finish(
381        &mut self,
382        puffin_writer: &mut SstPuffinWriter,
383        column_id: &ColumnId,
384        put_options: PutOptions,
385    ) -> Result<ByteCount> {
386        match self {
387            Self::Tantivy(creator) => {
388                let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id);
389                creator
390                    .finish(puffin_writer, &key, put_options)
391                    .await
392                    .context(FulltextFinishSnafu)
393            }
394            Self::Bloom(creator) => {
395                let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id);
396                creator
397                    .finish(puffin_writer, &key, put_options)
398                    .await
399                    .context(FulltextFinishSnafu)
400            }
401        }
402    }
403
404    async fn abort(&mut self, column_id: &ColumnId) {
405        match self {
406            Self::Tantivy(creator) => {
407                if let Err(err) = creator.abort().await {
408                    warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id);
409                }
410            }
411            Self::Bloom(creator) => {
412                if let Err(err) = creator.abort().await {
413                    warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id);
414                }
415            }
416        }
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use std::collections::{BTreeMap, BTreeSet};
423    use std::sync::Arc;
424
425    use api::v1::SemanticType;
426    use common_base::BitVec;
427    use datatypes::data_type::DataType;
428    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
429    use datatypes::vectors::{UInt8Vector, UInt64Vector};
430    use futures::FutureExt;
431    use futures::future::BoxFuture;
432    use index::fulltext_index::search::RowId;
433    use object_store::ObjectStore;
434    use object_store::services::Memory;
435    use puffin::puffin_manager::{PuffinManager, PuffinWriter};
436    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
437    use store_api::region_request::PathType;
438    use store_api::storage::{ConcreteDataType, FileId, RegionId};
439
440    use super::*;
441    use crate::access_layer::RegionFilePathFactory;
442    use crate::read::{Batch, BatchColumn};
443    use crate::sst::file::RegionFileId;
444    use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
445    use crate::sst::index::fulltext_index::applier::builder::{
446        FulltextQuery, FulltextRequest, FulltextTerm,
447    };
448    use crate::sst::index::puffin_manager::PuffinManagerFactory;
449
450    fn mock_object_store() -> ObjectStore {
451        ObjectStore::new(Memory::default()).unwrap().finish()
452    }
453
454    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
455        IntermediateManager::init_fs(path).await.unwrap()
456    }
457
458    fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef {
459        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
460        builder
461            .push_column_metadata(ColumnMetadata {
462                column_schema: ColumnSchema::new(
463                    "text_english_case_sensitive",
464                    ConcreteDataType::string_datatype(),
465                    true,
466                )
467                .with_fulltext_options(FulltextOptions::new_unchecked(
468                    true,
469                    FulltextAnalyzer::English,
470                    true,
471                    backend.clone(),
472                    1,
473                    0.01,
474                ))
475                .unwrap(),
476                semantic_type: SemanticType::Field,
477                column_id: 1,
478            })
479            .push_column_metadata(ColumnMetadata {
480                column_schema: ColumnSchema::new(
481                    "text_english_case_insensitive",
482                    ConcreteDataType::string_datatype(),
483                    true,
484                )
485                .with_fulltext_options(FulltextOptions::new_unchecked(
486                    true,
487                    FulltextAnalyzer::English,
488                    false,
489                    backend.clone(),
490                    1,
491                    0.01,
492                ))
493                .unwrap(),
494                semantic_type: SemanticType::Field,
495                column_id: 2,
496            })
497            .push_column_metadata(ColumnMetadata {
498                column_schema: ColumnSchema::new(
499                    "text_chinese",
500                    ConcreteDataType::string_datatype(),
501                    true,
502                )
503                .with_fulltext_options(FulltextOptions::new_unchecked(
504                    true,
505                    FulltextAnalyzer::Chinese,
506                    false,
507                    backend.clone(),
508                    1,
509                    0.01,
510                ))
511                .unwrap(),
512                semantic_type: SemanticType::Field,
513                column_id: 3,
514            })
515            .push_column_metadata(ColumnMetadata {
516                column_schema: ColumnSchema::new(
517                    "ts",
518                    ConcreteDataType::timestamp_millisecond_datatype(),
519                    false,
520                ),
521                semantic_type: SemanticType::Timestamp,
522                column_id: 4,
523            });
524
525        Arc::new(builder.build().unwrap())
526    }
527
528    fn new_batch(
529        rows: &[(
530            Option<&str>, // text_english_case_sensitive
531            Option<&str>, // text_english_case_insensitive
532            Option<&str>, // text_chinese
533        )],
534    ) -> Batch {
535        let mut vec_english_sensitive =
536            ConcreteDataType::string_datatype().create_mutable_vector(0);
537        let mut vec_english_insensitive =
538            ConcreteDataType::string_datatype().create_mutable_vector(0);
539        let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0);
540
541        for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
542            match text_english_case_sensitive {
543                Some(s) => vec_english_sensitive.push_value_ref(&(*s).into()),
544                None => vec_english_sensitive.push_null(),
545            }
546            match text_english_case_insensitive {
547                Some(s) => vec_english_insensitive.push_value_ref(&(*s).into()),
548                None => vec_english_insensitive.push_null(),
549            }
550            match text_chinese {
551                Some(s) => vec_chinese.push_value_ref(&(*s).into()),
552                None => vec_chinese.push_null(),
553            }
554        }
555
556        let num_rows = vec_english_sensitive.len();
557        Batch::new(
558            vec![],
559            Arc::new(UInt64Vector::from_iter_values(
560                (0..num_rows).map(|n| n as u64),
561            )),
562            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
563                0, num_rows,
564            ))),
565            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
566                1, num_rows,
567            ))),
568            vec![
569                BatchColumn {
570                    column_id: 1,
571                    data: vec_english_sensitive.to_vector(),
572                },
573                BatchColumn {
574                    column_id: 2,
575                    data: vec_english_insensitive.to_vector(),
576                },
577                BatchColumn {
578                    column_id: 3,
579                    data: vec_chinese.to_vector(),
580                },
581            ],
582        )
583        .unwrap()
584    }
585
586    /// Applier factory that can handle both queries and terms.
587    ///
588    /// It builds a fulltext index with the given data rows, and returns a function
589    /// that can handle both queries and terms in a single request.
590    ///
591    /// The function takes two parameters:
592    /// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries
593    /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
594    async fn build_fulltext_applier_factory(
595        prefix: &str,
596        backend: FulltextBackend,
597        rows: &[(
598            Option<&str>, // text_english_case_sensitive
599            Option<&str>, // text_english_case_insensitive
600            Option<&str>, // text_chinese
601        )],
602    ) -> impl Fn(
603        Vec<(ColumnId, &str)>,
604        Vec<(ColumnId, Vec<(bool, &str)>)>,
605        Option<BitVec>,
606    ) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
607        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
608        let table_dir = "table0".to_string();
609        let sst_file_id = FileId::random();
610        let object_store = mock_object_store();
611        let region_metadata = mock_region_metadata(backend.clone());
612        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
613
614        let mut indexer = FulltextIndexer::new(
615            &region_metadata.region_id,
616            &sst_file_id,
617            &intm_mgr,
618            &region_metadata,
619            true,
620            1024,
621        )
622        .await
623        .unwrap()
624        .unwrap();
625
626        let mut batch = new_batch(rows);
627        indexer.update(&mut batch).await.unwrap();
628
629        let puffin_manager = factory.build(
630            object_store.clone(),
631            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
632        );
633        let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
634        let mut writer = puffin_manager.writer(&region_file_id).await.unwrap();
635        let _ = indexer.finish(&mut writer).await.unwrap();
636        writer.finish().await.unwrap();
637
638        move |queries: Vec<(ColumnId, &str)>,
639              terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>,
640              coarse_mask: Option<BitVec>| {
641            let _d = &d;
642            let table_dir = table_dir.clone();
643            let object_store = object_store.clone();
644            let factory = factory.clone();
645
646            let mut requests: BTreeMap<ColumnId, FulltextRequest> = BTreeMap::new();
647
648            // Add queries
649            for (column_id, query) in queries {
650                requests
651                    .entry(column_id)
652                    .or_default()
653                    .queries
654                    .push(FulltextQuery(query.to_string()));
655            }
656
657            // Add terms
658            for (column_id, terms) in terms_requests {
659                let fulltext_terms = terms
660                    .into_iter()
661                    .map(|(col_lowered, term)| FulltextTerm {
662                        col_lowered,
663                        term: term.to_string(),
664                    })
665                    .collect::<Vec<_>>();
666
667                requests
668                    .entry(column_id)
669                    .or_default()
670                    .terms
671                    .extend(fulltext_terms);
672            }
673
674            let applier = FulltextIndexApplier::new(
675                table_dir,
676                PathType::Bare,
677                object_store,
678                requests,
679                factory,
680            );
681
682            let backend = backend.clone();
683            async move {
684                match backend {
685                    FulltextBackend::Tantivy => {
686                        applier.apply_fine(region_file_id, None).await.unwrap()
687                    }
688                    FulltextBackend::Bloom => {
689                        let coarse_mask = coarse_mask.unwrap_or_default();
690                        let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
691                        // row group id == row id
692                        let resp = applier
693                            .apply_coarse(region_file_id, None, row_groups)
694                            .await
695                            .unwrap();
696                        resp.map(|r| {
697                            r.into_iter()
698                                .filter(|(_, ranges)| !ranges.is_empty())
699                                .map(|(row_group_id, _)| row_group_id as RowId)
700                                .collect()
701                        })
702                    }
703                }
704            }
705            .boxed()
706        }
707    }
708
709    fn rows(row_ids: impl IntoIterator<Item = RowId>) -> BTreeSet<RowId> {
710        row_ids.into_iter().collect()
711    }
712
713    #[tokio::test]
714    async fn test_fulltext_index_basic_case_sensitive_tantivy() {
715        let applier_factory = build_fulltext_applier_factory(
716            "test_fulltext_index_basic_case_sensitive_tantivy_",
717            FulltextBackend::Tantivy,
718            &[
719                (Some("hello"), None, None),
720                (Some("world"), None, None),
721                (None, None, None),
722                (Some("Hello, World"), None, None),
723            ],
724        )
725        .await;
726
727        let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await;
728        assert_eq!(row_ids, Some(rows([0])));
729
730        let row_ids = applier_factory(vec![(1, "world")], vec![], None).await;
731        assert_eq!(row_ids, Some(rows([1])));
732
733        let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await;
734        assert_eq!(row_ids, Some(rows([3])));
735
736        let row_ids = applier_factory(vec![(1, "World")], vec![], None).await;
737        assert_eq!(row_ids, Some(rows([3])));
738
739        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await;
740        assert_eq!(row_ids, Some(rows([0])));
741
742        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await;
743        assert_eq!(row_ids, None);
744
745        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await;
746        assert_eq!(row_ids, Some(rows([1])));
747
748        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await;
749        assert_eq!(row_ids, None);
750
751        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await;
752        assert_eq!(row_ids, Some(rows([3])));
753
754        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await;
755        assert_eq!(row_ids, None);
756
757        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await;
758        assert_eq!(row_ids, Some(rows([3])));
759
760        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await;
761        assert_eq!(row_ids, None);
762    }
763
764    #[tokio::test]
765    async fn test_fulltext_index_basic_case_sensitive_bloom() {
766        let applier_factory = build_fulltext_applier_factory(
767            "test_fulltext_index_basic_case_sensitive_bloom_",
768            FulltextBackend::Bloom,
769            &[
770                (Some("hello"), None, None),
771                (Some("world"), None, None),
772                (None, None, None),
773                (Some("Hello, World"), None, None),
774            ],
775        )
776        .await;
777
778        let row_ids = applier_factory(
779            vec![],
780            vec![(1, vec![(false, "hello")])],
781            Some(BitVec::from_slice(&[0b1111])),
782        )
783        .await;
784        assert_eq!(row_ids, Some(rows([0])));
785
786        let row_ids = applier_factory(
787            vec![],
788            vec![(1, vec![(false, "hello")])],
789            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
790        )
791        .await;
792        assert_eq!(row_ids, Some(rows([])));
793
794        let row_ids = applier_factory(
795            vec![],
796            vec![(1, vec![(true, "hello")])],
797            Some(BitVec::from_slice(&[0b1111])),
798        )
799        .await;
800        assert_eq!(row_ids, None);
801
802        let row_ids = applier_factory(
803            vec![],
804            vec![(1, vec![(false, "world")])],
805            Some(BitVec::from_slice(&[0b1111])),
806        )
807        .await;
808        assert_eq!(row_ids, Some(rows([1])));
809
810        let row_ids = applier_factory(
811            vec![],
812            vec![(1, vec![(false, "world")])],
813            Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out
814        )
815        .await;
816        assert_eq!(row_ids, Some(rows([])));
817
818        let row_ids = applier_factory(
819            vec![],
820            vec![(1, vec![(true, "world")])],
821            Some(BitVec::from_slice(&[0b1111])),
822        )
823        .await;
824        assert_eq!(row_ids, None);
825
826        let row_ids = applier_factory(
827            vec![],
828            vec![(1, vec![(false, "Hello")])],
829            Some(BitVec::from_slice(&[0b1111])),
830        )
831        .await;
832        assert_eq!(row_ids, Some(rows([3])));
833
834        let row_ids = applier_factory(
835            vec![],
836            vec![(1, vec![(false, "Hello")])],
837            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
838        )
839        .await;
840        assert_eq!(row_ids, Some(rows([])));
841
842        let row_ids = applier_factory(
843            vec![],
844            vec![(1, vec![(true, "Hello")])],
845            Some(BitVec::from_slice(&[0b1111])),
846        )
847        .await;
848        assert_eq!(row_ids, None);
849
850        let row_ids = applier_factory(
851            vec![],
852            vec![(1, vec![(false, "Hello, World")])],
853            Some(BitVec::from_slice(&[0b1111])),
854        )
855        .await;
856        assert_eq!(row_ids, Some(rows([3])));
857
858        let row_ids = applier_factory(
859            vec![],
860            vec![(1, vec![(false, "Hello, World")])],
861            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
862        )
863        .await;
864        assert_eq!(row_ids, Some(rows([])));
865
866        let row_ids = applier_factory(
867            vec![],
868            vec![(1, vec![(true, "Hello, World")])],
869            Some(BitVec::from_slice(&[0b1111])),
870        )
871        .await;
872        assert_eq!(row_ids, None);
873    }
874
875    #[tokio::test]
876    async fn test_fulltext_index_basic_case_insensitive_tantivy() {
877        let applier_factory = build_fulltext_applier_factory(
878            "test_fulltext_index_basic_case_insensitive_tantivy_",
879            FulltextBackend::Tantivy,
880            &[
881                (None, Some("hello"), None),
882                (None, None, None),
883                (None, Some("world"), None),
884                (None, Some("Hello, World"), None),
885            ],
886        )
887        .await;
888
889        let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await;
890        assert_eq!(row_ids, Some(rows([0, 3])));
891
892        let row_ids = applier_factory(vec![(2, "world")], vec![], None).await;
893        assert_eq!(row_ids, Some(rows([2, 3])));
894
895        let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await;
896        assert_eq!(row_ids, Some(rows([0, 3])));
897
898        let row_ids = applier_factory(vec![(2, "World")], vec![], None).await;
899        assert_eq!(row_ids, Some(rows([2, 3])));
900
901        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await;
902        assert_eq!(row_ids, Some(rows([0, 3])));
903
904        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await;
905        assert_eq!(row_ids, Some(rows([0, 3])));
906
907        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await;
908        assert_eq!(row_ids, Some(rows([2, 3])));
909
910        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await;
911        assert_eq!(row_ids, Some(rows([2, 3])));
912
913        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await;
914        assert_eq!(row_ids, Some(rows([0, 3])));
915
916        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await;
917        assert_eq!(row_ids, Some(rows([0, 3])));
918
919        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await;
920        assert_eq!(row_ids, Some(rows([2, 3])));
921
922        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await;
923        assert_eq!(row_ids, Some(rows([2, 3])));
924    }
925
926    #[tokio::test]
927    async fn test_fulltext_index_basic_case_insensitive_bloom() {
928        let applier_factory = build_fulltext_applier_factory(
929            "test_fulltext_index_basic_case_insensitive_bloom_",
930            FulltextBackend::Bloom,
931            &[
932                (None, Some("hello"), None),
933                (None, None, None),
934                (None, Some("world"), None),
935                (None, Some("Hello, World"), None),
936            ],
937        )
938        .await;
939
940        let row_ids = applier_factory(
941            vec![],
942            vec![(2, vec![(false, "hello")])],
943            Some(BitVec::from_slice(&[0b1111])),
944        )
945        .await;
946        assert_eq!(row_ids, Some(rows([0, 3])));
947
948        let row_ids = applier_factory(
949            vec![],
950            vec![(2, vec![(false, "hello")])],
951            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
952        )
953        .await;
954        assert_eq!(row_ids, Some(rows([3])));
955
956        let row_ids = applier_factory(
957            vec![],
958            vec![(2, vec![(true, "hello")])],
959            Some(BitVec::from_slice(&[0b1111])),
960        )
961        .await;
962        assert_eq!(row_ids, Some(rows([0, 3])));
963
964        let row_ids = applier_factory(
965            vec![],
966            vec![(2, vec![(true, "hello")])],
967            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
968        )
969        .await;
970        assert_eq!(row_ids, Some(rows([3])));
971
972        let row_ids = applier_factory(
973            vec![],
974            vec![(2, vec![(false, "world")])],
975            Some(BitVec::from_slice(&[0b1111])),
976        )
977        .await;
978        assert_eq!(row_ids, Some(rows([2, 3])));
979
980        let row_ids = applier_factory(
981            vec![],
982            vec![(2, vec![(false, "world")])],
983            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
984        )
985        .await;
986        assert_eq!(row_ids, Some(rows([3])));
987
988        let row_ids = applier_factory(
989            vec![],
990            vec![(2, vec![(true, "world")])],
991            Some(BitVec::from_slice(&[0b1111])),
992        )
993        .await;
994        assert_eq!(row_ids, Some(rows([2, 3])));
995
996        let row_ids = applier_factory(
997            vec![],
998            vec![(2, vec![(true, "world")])],
999            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1000        )
1001        .await;
1002        assert_eq!(row_ids, Some(rows([3])));
1003
1004        let row_ids = applier_factory(
1005            vec![],
1006            vec![(2, vec![(false, "Hello")])],
1007            Some(BitVec::from_slice(&[0b1111])),
1008        )
1009        .await;
1010        assert_eq!(row_ids, Some(rows([0, 3])));
1011
1012        let row_ids = applier_factory(
1013            vec![],
1014            vec![(2, vec![(false, "Hello")])],
1015            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
1016        )
1017        .await;
1018        assert_eq!(row_ids, Some(rows([0])));
1019
1020        let row_ids = applier_factory(
1021            vec![],
1022            vec![(2, vec![(true, "Hello")])],
1023            Some(BitVec::from_slice(&[0b1111])),
1024        )
1025        .await;
1026        assert_eq!(row_ids, Some(rows([0, 3])));
1027
1028        let row_ids = applier_factory(
1029            vec![],
1030            vec![(2, vec![(true, "Hello")])],
1031            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1032        )
1033        .await;
1034        assert_eq!(row_ids, Some(rows([3])));
1035
1036        let row_ids = applier_factory(
1037            vec![],
1038            vec![(2, vec![(false, "World")])],
1039            Some(BitVec::from_slice(&[0b1111])),
1040        )
1041        .await;
1042        assert_eq!(row_ids, Some(rows([2, 3])));
1043
1044        let row_ids = applier_factory(
1045            vec![],
1046            vec![(2, vec![(false, "World")])],
1047            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
1048        )
1049        .await;
1050        assert_eq!(row_ids, Some(rows([2])));
1051
1052        let row_ids = applier_factory(
1053            vec![],
1054            vec![(2, vec![(true, "World")])],
1055            Some(BitVec::from_slice(&[0b1111])),
1056        )
1057        .await;
1058        assert_eq!(row_ids, Some(rows([2, 3])));
1059
1060        let row_ids = applier_factory(
1061            vec![],
1062            vec![(2, vec![(true, "World")])],
1063            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1064        )
1065        .await;
1066        assert_eq!(row_ids, Some(rows([3])));
1067    }
1068
1069    #[tokio::test]
1070    async fn test_fulltext_index_basic_chinese_tantivy() {
1071        let applier_factory = build_fulltext_applier_factory(
1072            "test_fulltext_index_basic_chinese_tantivy_",
1073            FulltextBackend::Tantivy,
1074            &[
1075                (None, None, Some("你好")),
1076                (None, None, None),
1077                (None, None, Some("世界")),
1078                (None, None, Some("你好,世界")),
1079            ],
1080        )
1081        .await;
1082
1083        let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await;
1084        assert_eq!(row_ids, Some(rows([0, 3])));
1085
1086        let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await;
1087        assert_eq!(row_ids, Some(rows([2, 3])));
1088
1089        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await;
1090        assert_eq!(row_ids, Some(rows([0, 3])));
1091
1092        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await;
1093        assert_eq!(row_ids, Some(rows([2, 3])));
1094    }
1095
1096    #[tokio::test]
1097    async fn test_fulltext_index_basic_chinese_bloom() {
1098        let applier_factory = build_fulltext_applier_factory(
1099            "test_fulltext_index_basic_chinese_bloom_",
1100            FulltextBackend::Bloom,
1101            &[
1102                (None, None, Some("你好")),
1103                (None, None, None),
1104                (None, None, Some("世界")),
1105                (None, None, Some("你好,世界")),
1106            ],
1107        )
1108        .await;
1109
1110        let row_ids = applier_factory(
1111            vec![],
1112            vec![(3, vec![(false, "你好")])],
1113            Some(BitVec::from_slice(&[0b1111])),
1114        )
1115        .await;
1116        assert_eq!(row_ids, Some(rows([0, 3])));
1117
1118        let row_ids = applier_factory(
1119            vec![],
1120            vec![(3, vec![(false, "你好")])],
1121            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1122        )
1123        .await;
1124        assert_eq!(row_ids, Some(rows([3])));
1125
1126        let row_ids = applier_factory(
1127            vec![],
1128            vec![(3, vec![(false, "世界")])],
1129            Some(BitVec::from_slice(&[0b1111])),
1130        )
1131        .await;
1132        assert_eq!(row_ids, Some(rows([2, 3])));
1133
1134        let row_ids = applier_factory(
1135            vec![],
1136            vec![(3, vec![(false, "世界")])],
1137            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1138        )
1139        .await;
1140        assert_eq!(row_ids, Some(rows([3])));
1141    }
1142
1143    #[tokio::test]
1144    async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() {
1145        let applier_factory = build_fulltext_applier_factory(
1146            "test_fulltext_index_multi_terms_case_sensitive_tantivy_",
1147            FulltextBackend::Tantivy,
1148            &[
1149                (Some("Hello"), None, None),
1150                (Some("World"), None, None),
1151                (None, None, None),
1152                (Some("Hello, World"), None, None),
1153            ],
1154        )
1155        .await;
1156
1157        let row_ids = applier_factory(
1158            vec![],
1159            vec![(1, vec![(false, "hello"), (false, "world")])],
1160            None,
1161        )
1162        .await;
1163        assert_eq!(row_ids, Some(rows([])));
1164
1165        let row_ids = applier_factory(
1166            vec![],
1167            vec![(1, vec![(false, "Hello"), (false, "World")])],
1168            None,
1169        )
1170        .await;
1171        assert_eq!(row_ids, Some(rows([3])));
1172
1173        let row_ids = applier_factory(
1174            vec![],
1175            vec![(1, vec![(true, "Hello"), (false, "World")])],
1176            None,
1177        )
1178        .await;
1179        assert_eq!(row_ids, Some(rows([1, 3])));
1180
1181        let row_ids = applier_factory(
1182            vec![],
1183            vec![(1, vec![(false, "Hello"), (true, "World")])],
1184            None,
1185        )
1186        .await;
1187        assert_eq!(row_ids, Some(rows([0, 3])));
1188
1189        let row_ids = applier_factory(
1190            vec![],
1191            vec![(1, vec![(true, "Hello"), (true, "World")])],
1192            None,
1193        )
1194        .await;
1195        assert_eq!(row_ids, None);
1196    }
1197
1198    #[tokio::test]
1199    async fn test_fulltext_index_multi_terms_case_sensitive_bloom() {
1200        let applier_factory = build_fulltext_applier_factory(
1201            "test_fulltext_index_multi_terms_case_sensitive_bloom_",
1202            FulltextBackend::Bloom,
1203            &[
1204                (Some("Hello"), None, None),
1205                (Some("World"), None, None),
1206                (None, None, None),
1207                (Some("Hello, World"), None, None),
1208            ],
1209        )
1210        .await;
1211
1212        let row_ids = applier_factory(
1213            vec![],
1214            vec![(1, vec![(false, "hello"), (false, "world")])],
1215            Some(BitVec::from_slice(&[0b1111])),
1216        )
1217        .await;
1218        assert_eq!(row_ids, Some(rows([])));
1219
1220        let row_ids = applier_factory(
1221            vec![],
1222            vec![(1, vec![(false, "Hello"), (false, "World")])],
1223            Some(BitVec::from_slice(&[0b1111])),
1224        )
1225        .await;
1226        assert_eq!(row_ids, Some(rows([3])));
1227
1228        let row_ids = applier_factory(
1229            vec![],
1230            vec![(1, vec![(true, "Hello"), (false, "World")])],
1231            Some(BitVec::from_slice(&[0b1111])),
1232        )
1233        .await;
1234        assert_eq!(row_ids, Some(rows([1, 3])));
1235
1236        let row_ids = applier_factory(
1237            vec![],
1238            vec![(1, vec![(false, "Hello"), (true, "World")])],
1239            Some(BitVec::from_slice(&[0b1111])),
1240        )
1241        .await;
1242        assert_eq!(row_ids, Some(rows([0, 3])));
1243
1244        let row_ids = applier_factory(
1245            vec![],
1246            vec![(1, vec![(true, "Hello"), (true, "World")])],
1247            Some(BitVec::from_slice(&[0b1111])),
1248        )
1249        .await;
1250        assert_eq!(row_ids, None);
1251    }
1252
1253    #[tokio::test]
1254    async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() {
1255        let applier_factory = build_fulltext_applier_factory(
1256            "test_fulltext_index_multi_terms_case_insensitive_tantivy_",
1257            FulltextBackend::Tantivy,
1258            &[
1259                (None, Some("hello"), None),
1260                (None, None, None),
1261                (None, Some("world"), None),
1262                (None, Some("Hello, World"), None),
1263            ],
1264        )
1265        .await;
1266
1267        let row_ids = applier_factory(
1268            vec![],
1269            vec![(2, vec![(false, "hello"), (false, "world")])],
1270            None,
1271        )
1272        .await;
1273        assert_eq!(row_ids, Some(rows([3])));
1274
1275        let row_ids = applier_factory(
1276            vec![],
1277            vec![(2, vec![(true, "hello"), (false, "world")])],
1278            None,
1279        )
1280        .await;
1281        assert_eq!(row_ids, Some(rows([3])));
1282
1283        let row_ids = applier_factory(
1284            vec![],
1285            vec![(2, vec![(false, "hello"), (true, "world")])],
1286            None,
1287        )
1288        .await;
1289        assert_eq!(row_ids, Some(rows([3])));
1290
1291        let row_ids = applier_factory(
1292            vec![],
1293            vec![(2, vec![(true, "hello"), (true, "world")])],
1294            None,
1295        )
1296        .await;
1297        assert_eq!(row_ids, Some(rows([3])));
1298    }
1299
1300    #[tokio::test]
1301    async fn test_fulltext_index_multi_terms_case_insensitive_bloom() {
1302        let applier_factory = build_fulltext_applier_factory(
1303            "test_fulltext_index_multi_terms_case_insensitive_bloom_",
1304            FulltextBackend::Bloom,
1305            &[
1306                (None, Some("hello"), None),
1307                (None, None, None),
1308                (None, Some("world"), None),
1309                (None, Some("Hello, World"), None),
1310            ],
1311        )
1312        .await;
1313
1314        let row_ids = applier_factory(
1315            vec![],
1316            vec![(2, vec![(false, "hello"), (false, "world")])],
1317            Some(BitVec::from_slice(&[0b1111])),
1318        )
1319        .await;
1320        assert_eq!(row_ids, Some(rows([3])));
1321
1322        let row_ids = applier_factory(
1323            vec![],
1324            vec![(2, vec![(true, "hello"), (false, "world")])],
1325            Some(BitVec::from_slice(&[0b1111])),
1326        )
1327        .await;
1328        assert_eq!(row_ids, Some(rows([3])));
1329
1330        let row_ids = applier_factory(
1331            vec![],
1332            vec![(2, vec![(false, "hello"), (true, "world")])],
1333            Some(BitVec::from_slice(&[0b1111])),
1334        )
1335        .await;
1336        assert_eq!(row_ids, Some(rows([3])));
1337
1338        let row_ids = applier_factory(
1339            vec![],
1340            vec![(2, vec![(true, "hello"), (true, "world")])],
1341            Some(BitVec::from_slice(&[0b1111])),
1342        )
1343        .await;
1344        assert_eq!(row_ids, Some(rows([3])));
1345    }
1346
1347    #[tokio::test]
1348    async fn test_fulltext_index_multi_columns_tantivy() {
1349        let applier_factory = build_fulltext_applier_factory(
1350            "test_fulltext_index_multi_columns_tantivy_",
1351            FulltextBackend::Tantivy,
1352            &[
1353                (Some("Hello"), None, Some("你好")),
1354                (Some("World"), Some("world"), None),
1355                (None, Some("World"), Some("世界")),
1356                (
1357                    Some("Hello, World"),
1358                    Some("Hello, World"),
1359                    Some("你好,世界"),
1360                ),
1361            ],
1362        )
1363        .await;
1364
1365        let row_ids = applier_factory(
1366            vec![(1, "Hello"), (3, "你好")],
1367            vec![(2, vec![(false, "world")])],
1368            None,
1369        )
1370        .await;
1371        assert_eq!(row_ids, Some(rows([3])));
1372
1373        let row_ids =
1374            applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await;
1375        assert_eq!(row_ids, Some(rows([1, 3])));
1376    }
1377
1378    #[tokio::test]
1379    async fn test_fulltext_index_multi_columns_bloom() {
1380        let applier_factory = build_fulltext_applier_factory(
1381            "test_fulltext_index_multi_columns_bloom_",
1382            FulltextBackend::Bloom,
1383            &[
1384                (Some("Hello"), None, Some("你好")),
1385                (Some("World"), Some("world"), None),
1386                (None, Some("World"), Some("世界")),
1387                (
1388                    Some("Hello, World"),
1389                    Some("Hello, World"),
1390                    Some("你好,世界"),
1391                ),
1392            ],
1393        )
1394        .await;
1395
1396        let row_ids = applier_factory(
1397            vec![],
1398            vec![
1399                (1, vec![(false, "Hello")]),
1400                (2, vec![(false, "world")]),
1401                (3, vec![(false, "你好")]),
1402            ],
1403            Some(BitVec::from_slice(&[0b1111])),
1404        )
1405        .await;
1406        assert_eq!(row_ids, Some(rows([3])));
1407
1408        let row_ids = applier_factory(
1409            vec![],
1410            vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])],
1411            Some(BitVec::from_slice(&[0b1111])),
1412        )
1413        .await;
1414        assert_eq!(row_ids, Some(rows([1, 3])));
1415    }
1416}