mito2/sst/index/fulltext_index/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::AtomicUsize;
18
19use api::v1::SemanticType;
20use common_telemetry::warn;
21use datatypes::arrow::array::{Array, LargeStringArray, StringArray};
22use datatypes::arrow::datatypes::DataType;
23use datatypes::arrow::record_batch::RecordBatch;
24use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
25use index::fulltext_index::create::{
26    BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
27};
28use index::fulltext_index::{Analyzer, Config};
29use index::target::IndexTarget;
30use puffin::blob_metadata::CompressionCodec;
31use puffin::puffin_manager::PutOptions;
32use snafu::{ResultExt, ensure};
33use store_api::metadata::RegionMetadataRef;
34use store_api::storage::{ColumnId, ConcreteDataType, FileId, RegionId};
35
36use crate::error::{
37    CastVectorSnafu, ComputeArrowSnafu, CreateFulltextCreatorSnafu, DataTypeMismatchSnafu,
38    FulltextFinishSnafu, FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu,
39    Result,
40};
41use crate::read::Batch;
42use crate::sst::index::TYPE_FULLTEXT_INDEX;
43use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
44use crate::sst::index::intermediate::{
45    IntermediateLocation, IntermediateManager, TempFileProvider,
46};
47use crate::sst::index::puffin_manager::SstPuffinWriter;
48use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
49
50/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files.
51pub struct FulltextIndexer {
52    /// Creators for each column.
53    creators: HashMap<ColumnId, SingleCreator>,
54    /// Whether the index creation was aborted.
55    aborted: bool,
56    /// Statistics of index creation.
57    stats: Statistics,
58}
59
60impl FulltextIndexer {
61    /// Creates a new `FulltextIndexer`.
62    pub async fn new(
63        region_id: &RegionId,
64        sst_file_id: &FileId,
65        intermediate_manager: &IntermediateManager,
66        metadata: &RegionMetadataRef,
67        compress: bool,
68        mem_limit: usize,
69    ) -> Result<Option<Self>> {
70        let mut creators = HashMap::new();
71
72        for column in &metadata.column_metadatas {
73            // Tag columns don't support fulltext index now.
74            // If we need to support fulltext index for tag columns, we also need to parse
75            // the codec and handle sparse encoding for flat format specially.
76            if column.semantic_type == SemanticType::Tag {
77                common_telemetry::debug!(
78                    "Skip creating fulltext index for tag column {}",
79                    column.column_schema.name
80                );
81                continue;
82            }
83
84            let options = column
85                .column_schema
86                .fulltext_options()
87                .context(IndexOptionsSnafu {
88                    column_name: &column.column_schema.name,
89                })?;
90
91            // Relax the type constraint here as many types can be casted to string.
92
93            let options = match options {
94                Some(options) if options.enable => options,
95                _ => continue,
96            };
97
98            let column_id = column.column_id;
99            let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id);
100
101            let config = Config {
102                analyzer: match options.analyzer {
103                    FulltextAnalyzer::English => Analyzer::English,
104                    FulltextAnalyzer::Chinese => Analyzer::Chinese,
105                },
106                case_sensitive: options.case_sensitive,
107            };
108
109            let inner = match options.backend {
110                FulltextBackend::Tantivy => {
111                    let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
112                        .await
113                        .context(CreateFulltextCreatorSnafu)?;
114                    AltFulltextCreator::Tantivy(creator)
115                }
116                FulltextBackend::Bloom => {
117                    let temp_file_provider = Arc::new(TempFileProvider::new(
118                        IntermediateLocation::new(&metadata.region_id, sst_file_id),
119                        intermediate_manager.clone(),
120                    ));
121                    let global_memory_usage = Arc::new(AtomicUsize::new(0));
122                    let creator = BloomFilterFulltextIndexCreator::new(
123                        config,
124                        options.granularity as _,
125                        options.false_positive_rate(),
126                        temp_file_provider,
127                        global_memory_usage,
128                        Some(mem_limit),
129                    );
130                    AltFulltextCreator::Bloom(creator)
131                }
132            };
133
134            creators.insert(
135                column_id,
136                SingleCreator {
137                    column_id,
138                    column_name: column.column_schema.name.clone(),
139                    inner,
140                    compress,
141                },
142            );
143        }
144
145        Ok((!creators.is_empty()).then(move || Self {
146            creators,
147            aborted: false,
148            stats: Statistics::new(TYPE_FULLTEXT_INDEX),
149        }))
150    }
151
152    /// Updates the index with the given batch.
153    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
154        ensure!(!self.aborted, OperateAbortedIndexSnafu);
155
156        if let Err(update_err) = self.do_update(batch).await {
157            if let Err(err) = self.do_abort().await {
158                if cfg!(any(test, feature = "test")) {
159                    panic!("Failed to abort index creator, err: {err}");
160                } else {
161                    warn!(err; "Failed to abort index creator");
162                }
163            }
164            return Err(update_err);
165        }
166
167        Ok(())
168    }
169
170    /// Updates the fulltext index with the given flat format RecordBatch.
171    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
172        ensure!(!self.aborted, OperateAbortedIndexSnafu);
173
174        if batch.num_rows() == 0 {
175            return Ok(());
176        }
177
178        if let Err(update_err) = self.do_update_flat(batch).await {
179            if let Err(err) = self.do_abort().await {
180                if cfg!(any(test, feature = "test")) {
181                    panic!("Failed to abort index creator, err: {err}");
182                } else {
183                    warn!(err; "Failed to abort index creator");
184                }
185            }
186            return Err(update_err);
187        }
188
189        Ok(())
190    }
191
192    /// Finalizes the index creation.
193    pub async fn finish(
194        &mut self,
195        puffin_writer: &mut SstPuffinWriter,
196    ) -> Result<(RowCount, ByteCount)> {
197        ensure!(!self.aborted, OperateAbortedIndexSnafu);
198
199        match self.do_finish(puffin_writer).await {
200            Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())),
201            Err(finish_err) => {
202                if let Err(err) = self.do_abort().await {
203                    if cfg!(any(test, feature = "test")) {
204                        panic!("Failed to abort index creator, err: {err}");
205                    } else {
206                        warn!(err; "Failed to abort index creator");
207                    }
208                }
209                Err(finish_err)
210            }
211        }
212    }
213
214    /// Aborts the index creation.
215    pub async fn abort(&mut self) -> Result<()> {
216        if self.aborted {
217            return Ok(());
218        }
219
220        self.do_abort().await
221    }
222
223    /// Returns the memory usage of the index creator.
224    pub fn memory_usage(&self) -> usize {
225        self.creators.values().map(|c| c.inner.memory_usage()).sum()
226    }
227
228    /// Returns IDs of columns that the creator is responsible for.
229    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
230        self.creators.keys().copied()
231    }
232}
233
234impl FulltextIndexer {
235    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
236        let mut guard = self.stats.record_update();
237        guard.inc_row_count(batch.num_rows());
238
239        for creator in self.creators.values_mut() {
240            creator.update(batch).await?;
241        }
242
243        Ok(())
244    }
245
246    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
247        let mut guard = self.stats.record_update();
248        guard.inc_row_count(batch.num_rows());
249
250        for creator in self.creators.values_mut() {
251            creator.update_flat(batch).await?;
252        }
253
254        Ok(())
255    }
256
257    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
258        let mut guard = self.stats.record_finish();
259
260        let mut written_bytes = 0;
261        for creator in self.creators.values_mut() {
262            written_bytes += creator.finish(puffin_writer).await?;
263        }
264
265        guard.inc_byte_count(written_bytes);
266        Ok(())
267    }
268
269    async fn do_abort(&mut self) -> Result<()> {
270        let _guard = self.stats.record_cleanup();
271
272        self.aborted = true;
273
274        for (_, mut creator) in self.creators.drain() {
275            creator.abort().await?;
276        }
277
278        Ok(())
279    }
280}
281
282/// `SingleCreator` is a creator for a single column.
283struct SingleCreator {
284    /// Column ID.
285    column_id: ColumnId,
286    /// Column name.
287    column_name: String,
288    /// Inner creator.
289    inner: AltFulltextCreator,
290    /// Whether the index should be compressed.
291    compress: bool,
292}
293
294impl SingleCreator {
295    async fn update(&mut self, batch: &mut Batch) -> Result<()> {
296        let text_column = batch
297            .fields()
298            .iter()
299            .find(|c| c.column_id == self.column_id);
300        match text_column {
301            Some(column) => {
302                let data = column
303                    .data
304                    .cast(&ConcreteDataType::string_datatype())
305                    .context(CastVectorSnafu {
306                        from: column.data.data_type(),
307                        to: ConcreteDataType::string_datatype(),
308                    })?;
309
310                for i in 0..batch.num_rows() {
311                    let data = data.get_ref(i);
312                    let text = data
313                        .try_into_string()
314                        .context(DataTypeMismatchSnafu)?
315                        .unwrap_or_default();
316                    self.inner.push_text(text).await?;
317                }
318            }
319            _ => {
320                // If the column is not found in the batch, push empty text.
321                // Ensure that the number of texts pushed is the same as the number of rows in the SST,
322                // so that the texts are aligned with the row ids.
323                for _ in 0..batch.num_rows() {
324                    self.inner.push_text("").await?;
325                }
326            }
327        }
328
329        Ok(())
330    }
331
332    async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
333        // Find the column in the RecordBatch by name
334        if let Some(column_array) = batch.column_by_name(&self.column_name) {
335            // Convert Arrow array to string array.
336            // TODO(yingwen): Use Utf8View later if possible.
337            match column_array.data_type() {
338                DataType::Utf8 => {
339                    let string_array = column_array.as_any().downcast_ref::<StringArray>().unwrap();
340                    for text_opt in string_array.iter() {
341                        let text = text_opt.unwrap_or_default();
342                        self.inner.push_text(text).await?;
343                    }
344                }
345                DataType::LargeUtf8 => {
346                    let large_string_array = column_array
347                        .as_any()
348                        .downcast_ref::<LargeStringArray>()
349                        .unwrap();
350                    for text_opt in large_string_array.iter() {
351                        let text = text_opt.unwrap_or_default();
352                        self.inner.push_text(text).await?;
353                    }
354                }
355                _ => {
356                    // For other types, cast to Utf8 as before
357                    let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8)
358                        .context(ComputeArrowSnafu)?;
359                    let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
360                    for text_opt in string_array.iter() {
361                        let text = text_opt.unwrap_or_default();
362                        self.inner.push_text(text).await?;
363                    }
364                }
365            }
366        } else {
367            // If the column is not found in the batch, push empty text.
368            // Ensure that the number of texts pushed is the same as the number of rows in the SST,
369            // so that the texts are aligned with the row ids.
370            for _ in 0..batch.num_rows() {
371                self.inner.push_text("").await?;
372            }
373        }
374
375        Ok(())
376    }
377
378    async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
379        let options = PutOptions {
380            compression: self.compress.then_some(CompressionCodec::Zstd),
381        };
382        self.inner
383            .finish(puffin_writer, &self.column_id, options)
384            .await
385    }
386
387    async fn abort(&mut self) -> Result<()> {
388        self.inner.abort(&self.column_id).await;
389        Ok(())
390    }
391}
392
393#[allow(dead_code, clippy::large_enum_variant)]
394/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter.
395enum AltFulltextCreator {
396    Tantivy(TantivyFulltextIndexCreator),
397    Bloom(BloomFilterFulltextIndexCreator),
398}
399
400impl AltFulltextCreator {
401    async fn push_text(&mut self, text: &str) -> Result<()> {
402        match self {
403            Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
404            Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
405        }
406    }
407
408    fn memory_usage(&self) -> usize {
409        match self {
410            Self::Tantivy(creator) => creator.memory_usage(),
411            Self::Bloom(creator) => creator.memory_usage(),
412        }
413    }
414
415    async fn finish(
416        &mut self,
417        puffin_writer: &mut SstPuffinWriter,
418        column_id: &ColumnId,
419        put_options: PutOptions,
420    ) -> Result<ByteCount> {
421        match self {
422            Self::Tantivy(creator) => {
423                let blob_key = format!(
424                    "{INDEX_BLOB_TYPE_TANTIVY}-{}",
425                    IndexTarget::ColumnId(*column_id)
426                );
427                creator
428                    .finish(puffin_writer, &blob_key, put_options)
429                    .await
430                    .context(FulltextFinishSnafu)
431            }
432            Self::Bloom(creator) => {
433                let blob_key = format!(
434                    "{INDEX_BLOB_TYPE_BLOOM}-{}",
435                    IndexTarget::ColumnId(*column_id)
436                );
437                creator
438                    .finish(puffin_writer, &blob_key, put_options)
439                    .await
440                    .context(FulltextFinishSnafu)
441            }
442        }
443    }
444
445    async fn abort(&mut self, column_id: &ColumnId) {
446        match self {
447            Self::Tantivy(creator) => {
448                if let Err(err) = creator.abort().await {
449                    warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id);
450                }
451            }
452            Self::Bloom(creator) => {
453                if let Err(err) = creator.abort().await {
454                    warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id);
455                }
456            }
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::collections::{BTreeMap, BTreeSet};
464    use std::sync::Arc;
465
466    use api::v1::SemanticType;
467    use common_base::BitVec;
468    use datatypes::data_type::DataType;
469    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
470    use datatypes::vectors::{UInt8Vector, UInt64Vector};
471    use futures::FutureExt;
472    use futures::future::BoxFuture;
473    use index::fulltext_index::search::RowId;
474    use object_store::ObjectStore;
475    use object_store::services::Memory;
476    use puffin::puffin_manager::{PuffinManager, PuffinWriter};
477    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
478    use store_api::region_request::PathType;
479    use store_api::storage::{ConcreteDataType, FileId, RegionId};
480
481    use super::*;
482    use crate::access_layer::RegionFilePathFactory;
483    use crate::read::{Batch, BatchColumn};
484    use crate::sst::file::{RegionFileId, RegionIndexId};
485    use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
486    use crate::sst::index::fulltext_index::applier::builder::{
487        FulltextQuery, FulltextRequest, FulltextTerm,
488    };
489    use crate::sst::index::puffin_manager::PuffinManagerFactory;
490
491    fn mock_object_store() -> ObjectStore {
492        ObjectStore::new(Memory::default()).unwrap().finish()
493    }
494
495    async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
496        IntermediateManager::init_fs(path).await.unwrap()
497    }
498
499    fn mock_region_metadata(backend: FulltextBackend) -> RegionMetadataRef {
500        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
501        builder
502            .push_column_metadata(ColumnMetadata {
503                column_schema: ColumnSchema::new(
504                    "text_english_case_sensitive",
505                    ConcreteDataType::string_datatype(),
506                    true,
507                )
508                .with_fulltext_options(FulltextOptions::new_unchecked(
509                    true,
510                    FulltextAnalyzer::English,
511                    true,
512                    backend.clone(),
513                    1,
514                    0.01,
515                ))
516                .unwrap(),
517                semantic_type: SemanticType::Field,
518                column_id: 1,
519            })
520            .push_column_metadata(ColumnMetadata {
521                column_schema: ColumnSchema::new(
522                    "text_english_case_insensitive",
523                    ConcreteDataType::string_datatype(),
524                    true,
525                )
526                .with_fulltext_options(FulltextOptions::new_unchecked(
527                    true,
528                    FulltextAnalyzer::English,
529                    false,
530                    backend.clone(),
531                    1,
532                    0.01,
533                ))
534                .unwrap(),
535                semantic_type: SemanticType::Field,
536                column_id: 2,
537            })
538            .push_column_metadata(ColumnMetadata {
539                column_schema: ColumnSchema::new(
540                    "text_chinese",
541                    ConcreteDataType::string_datatype(),
542                    true,
543                )
544                .with_fulltext_options(FulltextOptions::new_unchecked(
545                    true,
546                    FulltextAnalyzer::Chinese,
547                    false,
548                    backend.clone(),
549                    1,
550                    0.01,
551                ))
552                .unwrap(),
553                semantic_type: SemanticType::Field,
554                column_id: 3,
555            })
556            .push_column_metadata(ColumnMetadata {
557                column_schema: ColumnSchema::new(
558                    "ts",
559                    ConcreteDataType::timestamp_millisecond_datatype(),
560                    false,
561                ),
562                semantic_type: SemanticType::Timestamp,
563                column_id: 4,
564            });
565
566        Arc::new(builder.build().unwrap())
567    }
568
569    fn new_batch(
570        rows: &[(
571            Option<&str>, // text_english_case_sensitive
572            Option<&str>, // text_english_case_insensitive
573            Option<&str>, // text_chinese
574        )],
575    ) -> Batch {
576        let mut vec_english_sensitive =
577            ConcreteDataType::string_datatype().create_mutable_vector(0);
578        let mut vec_english_insensitive =
579            ConcreteDataType::string_datatype().create_mutable_vector(0);
580        let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0);
581
582        for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
583            match text_english_case_sensitive {
584                Some(s) => vec_english_sensitive.push_value_ref(&(*s).into()),
585                None => vec_english_sensitive.push_null(),
586            }
587            match text_english_case_insensitive {
588                Some(s) => vec_english_insensitive.push_value_ref(&(*s).into()),
589                None => vec_english_insensitive.push_null(),
590            }
591            match text_chinese {
592                Some(s) => vec_chinese.push_value_ref(&(*s).into()),
593                None => vec_chinese.push_null(),
594            }
595        }
596
597        let num_rows = vec_english_sensitive.len();
598        Batch::new(
599            vec![],
600            Arc::new(UInt64Vector::from_iter_values(
601                (0..num_rows).map(|n| n as u64),
602            )),
603            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
604                0, num_rows,
605            ))),
606            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
607                1, num_rows,
608            ))),
609            vec![
610                BatchColumn {
611                    column_id: 1,
612                    data: vec_english_sensitive.to_vector(),
613                },
614                BatchColumn {
615                    column_id: 2,
616                    data: vec_english_insensitive.to_vector(),
617                },
618                BatchColumn {
619                    column_id: 3,
620                    data: vec_chinese.to_vector(),
621                },
622            ],
623        )
624        .unwrap()
625    }
626
627    /// Applier factory that can handle both queries and terms.
628    ///
629    /// It builds a fulltext index with the given data rows, and returns a function
630    /// that can handle both queries and terms in a single request.
631    ///
632    /// The function takes two parameters:
633    /// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries
634    /// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
635    async fn build_fulltext_applier_factory(
636        prefix: &str,
637        backend: FulltextBackend,
638        rows: &[(
639            Option<&str>, // text_english_case_sensitive
640            Option<&str>, // text_english_case_insensitive
641            Option<&str>, // text_chinese
642        )],
643    ) -> impl Fn(
644        Vec<(ColumnId, &str)>,
645        Vec<(ColumnId, Vec<(bool, &str)>)>,
646        Option<BitVec>,
647    ) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
648        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
649        let table_dir = "table0".to_string();
650        let sst_file_id = FileId::random();
651        let object_store = mock_object_store();
652        let region_metadata = mock_region_metadata(backend.clone());
653        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
654
655        let mut indexer = FulltextIndexer::new(
656            &region_metadata.region_id,
657            &sst_file_id,
658            &intm_mgr,
659            &region_metadata,
660            true,
661            1024,
662        )
663        .await
664        .unwrap()
665        .unwrap();
666
667        let mut batch = new_batch(rows);
668        indexer.update(&mut batch).await.unwrap();
669
670        let puffin_manager = factory.build(
671            object_store.clone(),
672            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
673        );
674        let region_file_id = RegionFileId::new(region_metadata.region_id, sst_file_id);
675        let index_id = RegionIndexId::new(region_file_id, 0);
676        let mut writer = puffin_manager.writer(&index_id).await.unwrap();
677        let _ = indexer.finish(&mut writer).await.unwrap();
678        writer.finish().await.unwrap();
679
680        move |queries: Vec<(ColumnId, &str)>,
681              terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>,
682              coarse_mask: Option<BitVec>| {
683            let _d = &d;
684            let table_dir = table_dir.clone();
685            let object_store = object_store.clone();
686            let factory = factory.clone();
687
688            let mut requests: BTreeMap<ColumnId, FulltextRequest> = BTreeMap::new();
689
690            // Add queries
691            for (column_id, query) in queries {
692                requests
693                    .entry(column_id)
694                    .or_default()
695                    .queries
696                    .push(FulltextQuery(query.to_string()));
697            }
698
699            // Add terms
700            for (column_id, terms) in terms_requests {
701                let fulltext_terms = terms
702                    .into_iter()
703                    .map(|(col_lowered, term)| FulltextTerm {
704                        col_lowered,
705                        term: term.to_string(),
706                    })
707                    .collect::<Vec<_>>();
708
709                requests
710                    .entry(column_id)
711                    .or_default()
712                    .terms
713                    .extend(fulltext_terms);
714            }
715
716            let applier = FulltextIndexApplier::new(
717                table_dir,
718                PathType::Bare,
719                object_store,
720                requests,
721                factory,
722            );
723
724            let backend = backend.clone();
725            async move {
726                match backend {
727                    FulltextBackend::Tantivy => {
728                        applier.apply_fine(index_id, None, None).await.unwrap()
729                    }
730                    FulltextBackend::Bloom => {
731                        let coarse_mask = coarse_mask.unwrap_or_default();
732                        let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
733                        // row group id == row id
734                        let resp = applier
735                            .apply_coarse(index_id, None, row_groups, None)
736                            .await
737                            .unwrap();
738                        resp.map(|r| {
739                            r.into_iter()
740                                .filter(|(_, ranges)| !ranges.is_empty())
741                                .map(|(row_group_id, _)| row_group_id as RowId)
742                                .collect()
743                        })
744                    }
745                }
746            }
747            .boxed()
748        }
749    }
750
751    fn rows(row_ids: impl IntoIterator<Item = RowId>) -> BTreeSet<RowId> {
752        row_ids.into_iter().collect()
753    }
754
755    #[tokio::test]
756    async fn test_fulltext_index_basic_case_sensitive_tantivy() {
757        let applier_factory = build_fulltext_applier_factory(
758            "test_fulltext_index_basic_case_sensitive_tantivy_",
759            FulltextBackend::Tantivy,
760            &[
761                (Some("hello"), None, None),
762                (Some("world"), None, None),
763                (None, None, None),
764                (Some("Hello, World"), None, None),
765            ],
766        )
767        .await;
768
769        let row_ids = applier_factory(vec![(1, "hello")], vec![], None).await;
770        assert_eq!(row_ids, Some(rows([0])));
771
772        let row_ids = applier_factory(vec![(1, "world")], vec![], None).await;
773        assert_eq!(row_ids, Some(rows([1])));
774
775        let row_ids = applier_factory(vec![(1, "Hello")], vec![], None).await;
776        assert_eq!(row_ids, Some(rows([3])));
777
778        let row_ids = applier_factory(vec![(1, "World")], vec![], None).await;
779        assert_eq!(row_ids, Some(rows([3])));
780
781        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])], None).await;
782        assert_eq!(row_ids, Some(rows([0])));
783
784        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])], None).await;
785        assert_eq!(row_ids, None);
786
787        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])], None).await;
788        assert_eq!(row_ids, Some(rows([1])));
789
790        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])], None).await;
791        assert_eq!(row_ids, None);
792
793        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])], None).await;
794        assert_eq!(row_ids, Some(rows([3])));
795
796        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])], None).await;
797        assert_eq!(row_ids, None);
798
799        let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])], None).await;
800        assert_eq!(row_ids, Some(rows([3])));
801
802        let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])], None).await;
803        assert_eq!(row_ids, None);
804    }
805
806    #[tokio::test]
807    async fn test_fulltext_index_basic_case_sensitive_bloom() {
808        let applier_factory = build_fulltext_applier_factory(
809            "test_fulltext_index_basic_case_sensitive_bloom_",
810            FulltextBackend::Bloom,
811            &[
812                (Some("hello"), None, None),
813                (Some("world"), None, None),
814                (None, None, None),
815                (Some("Hello, World"), None, None),
816            ],
817        )
818        .await;
819
820        let row_ids = applier_factory(
821            vec![],
822            vec![(1, vec![(false, "hello")])],
823            Some(BitVec::from_slice(&[0b1111])),
824        )
825        .await;
826        assert_eq!(row_ids, Some(rows([0])));
827
828        let row_ids = applier_factory(
829            vec![],
830            vec![(1, vec![(false, "hello")])],
831            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
832        )
833        .await;
834        assert_eq!(row_ids, Some(rows([])));
835
836        let row_ids = applier_factory(
837            vec![],
838            vec![(1, vec![(true, "hello")])],
839            Some(BitVec::from_slice(&[0b1111])),
840        )
841        .await;
842        assert_eq!(row_ids, None);
843
844        let row_ids = applier_factory(
845            vec![],
846            vec![(1, vec![(false, "world")])],
847            Some(BitVec::from_slice(&[0b1111])),
848        )
849        .await;
850        assert_eq!(row_ids, Some(rows([1])));
851
852        let row_ids = applier_factory(
853            vec![],
854            vec![(1, vec![(false, "world")])],
855            Some(BitVec::from_slice(&[0b1101])), // row 1 is filtered out
856        )
857        .await;
858        assert_eq!(row_ids, Some(rows([])));
859
860        let row_ids = applier_factory(
861            vec![],
862            vec![(1, vec![(true, "world")])],
863            Some(BitVec::from_slice(&[0b1111])),
864        )
865        .await;
866        assert_eq!(row_ids, None);
867
868        let row_ids = applier_factory(
869            vec![],
870            vec![(1, vec![(false, "Hello")])],
871            Some(BitVec::from_slice(&[0b1111])),
872        )
873        .await;
874        assert_eq!(row_ids, Some(rows([3])));
875
876        let row_ids = applier_factory(
877            vec![],
878            vec![(1, vec![(false, "Hello")])],
879            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
880        )
881        .await;
882        assert_eq!(row_ids, Some(rows([])));
883
884        let row_ids = applier_factory(
885            vec![],
886            vec![(1, vec![(true, "Hello")])],
887            Some(BitVec::from_slice(&[0b1111])),
888        )
889        .await;
890        assert_eq!(row_ids, None);
891
892        let row_ids = applier_factory(
893            vec![],
894            vec![(1, vec![(false, "Hello, World")])],
895            Some(BitVec::from_slice(&[0b1111])),
896        )
897        .await;
898        assert_eq!(row_ids, Some(rows([3])));
899
900        let row_ids = applier_factory(
901            vec![],
902            vec![(1, vec![(false, "Hello, World")])],
903            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
904        )
905        .await;
906        assert_eq!(row_ids, Some(rows([])));
907
908        let row_ids = applier_factory(
909            vec![],
910            vec![(1, vec![(true, "Hello, World")])],
911            Some(BitVec::from_slice(&[0b1111])),
912        )
913        .await;
914        assert_eq!(row_ids, None);
915    }
916
917    #[tokio::test]
918    async fn test_fulltext_index_basic_case_insensitive_tantivy() {
919        let applier_factory = build_fulltext_applier_factory(
920            "test_fulltext_index_basic_case_insensitive_tantivy_",
921            FulltextBackend::Tantivy,
922            &[
923                (None, Some("hello"), None),
924                (None, None, None),
925                (None, Some("world"), None),
926                (None, Some("Hello, World"), None),
927            ],
928        )
929        .await;
930
931        let row_ids = applier_factory(vec![(2, "hello")], vec![], None).await;
932        assert_eq!(row_ids, Some(rows([0, 3])));
933
934        let row_ids = applier_factory(vec![(2, "world")], vec![], None).await;
935        assert_eq!(row_ids, Some(rows([2, 3])));
936
937        let row_ids = applier_factory(vec![(2, "Hello")], vec![], None).await;
938        assert_eq!(row_ids, Some(rows([0, 3])));
939
940        let row_ids = applier_factory(vec![(2, "World")], vec![], None).await;
941        assert_eq!(row_ids, Some(rows([2, 3])));
942
943        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])], None).await;
944        assert_eq!(row_ids, Some(rows([0, 3])));
945
946        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])], None).await;
947        assert_eq!(row_ids, Some(rows([0, 3])));
948
949        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])], None).await;
950        assert_eq!(row_ids, Some(rows([2, 3])));
951
952        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])], None).await;
953        assert_eq!(row_ids, Some(rows([2, 3])));
954
955        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])], None).await;
956        assert_eq!(row_ids, Some(rows([0, 3])));
957
958        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])], None).await;
959        assert_eq!(row_ids, Some(rows([0, 3])));
960
961        let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])], None).await;
962        assert_eq!(row_ids, Some(rows([2, 3])));
963
964        let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])], None).await;
965        assert_eq!(row_ids, Some(rows([2, 3])));
966    }
967
968    #[tokio::test]
969    async fn test_fulltext_index_basic_case_insensitive_bloom() {
970        let applier_factory = build_fulltext_applier_factory(
971            "test_fulltext_index_basic_case_insensitive_bloom_",
972            FulltextBackend::Bloom,
973            &[
974                (None, Some("hello"), None),
975                (None, None, None),
976                (None, Some("world"), None),
977                (None, Some("Hello, World"), None),
978            ],
979        )
980        .await;
981
982        let row_ids = applier_factory(
983            vec![],
984            vec![(2, vec![(false, "hello")])],
985            Some(BitVec::from_slice(&[0b1111])),
986        )
987        .await;
988        assert_eq!(row_ids, Some(rows([0, 3])));
989
990        let row_ids = applier_factory(
991            vec![],
992            vec![(2, vec![(false, "hello")])],
993            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
994        )
995        .await;
996        assert_eq!(row_ids, Some(rows([3])));
997
998        let row_ids = applier_factory(
999            vec![],
1000            vec![(2, vec![(true, "hello")])],
1001            Some(BitVec::from_slice(&[0b1111])),
1002        )
1003        .await;
1004        assert_eq!(row_ids, Some(rows([0, 3])));
1005
1006        let row_ids = applier_factory(
1007            vec![],
1008            vec![(2, vec![(true, "hello")])],
1009            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1010        )
1011        .await;
1012        assert_eq!(row_ids, Some(rows([3])));
1013
1014        let row_ids = applier_factory(
1015            vec![],
1016            vec![(2, vec![(false, "world")])],
1017            Some(BitVec::from_slice(&[0b1111])),
1018        )
1019        .await;
1020        assert_eq!(row_ids, Some(rows([2, 3])));
1021
1022        let row_ids = applier_factory(
1023            vec![],
1024            vec![(2, vec![(false, "world")])],
1025            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1026        )
1027        .await;
1028        assert_eq!(row_ids, Some(rows([3])));
1029
1030        let row_ids = applier_factory(
1031            vec![],
1032            vec![(2, vec![(true, "world")])],
1033            Some(BitVec::from_slice(&[0b1111])),
1034        )
1035        .await;
1036        assert_eq!(row_ids, Some(rows([2, 3])));
1037
1038        let row_ids = applier_factory(
1039            vec![],
1040            vec![(2, vec![(true, "world")])],
1041            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1042        )
1043        .await;
1044        assert_eq!(row_ids, Some(rows([3])));
1045
1046        let row_ids = applier_factory(
1047            vec![],
1048            vec![(2, vec![(false, "Hello")])],
1049            Some(BitVec::from_slice(&[0b1111])),
1050        )
1051        .await;
1052        assert_eq!(row_ids, Some(rows([0, 3])));
1053
1054        let row_ids = applier_factory(
1055            vec![],
1056            vec![(2, vec![(false, "Hello")])],
1057            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
1058        )
1059        .await;
1060        assert_eq!(row_ids, Some(rows([0])));
1061
1062        let row_ids = applier_factory(
1063            vec![],
1064            vec![(2, vec![(true, "Hello")])],
1065            Some(BitVec::from_slice(&[0b1111])),
1066        )
1067        .await;
1068        assert_eq!(row_ids, Some(rows([0, 3])));
1069
1070        let row_ids = applier_factory(
1071            vec![],
1072            vec![(2, vec![(true, "Hello")])],
1073            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1074        )
1075        .await;
1076        assert_eq!(row_ids, Some(rows([3])));
1077
1078        let row_ids = applier_factory(
1079            vec![],
1080            vec![(2, vec![(false, "World")])],
1081            Some(BitVec::from_slice(&[0b1111])),
1082        )
1083        .await;
1084        assert_eq!(row_ids, Some(rows([2, 3])));
1085
1086        let row_ids = applier_factory(
1087            vec![],
1088            vec![(2, vec![(false, "World")])],
1089            Some(BitVec::from_slice(&[0b0111])), // row 3 is filtered out
1090        )
1091        .await;
1092        assert_eq!(row_ids, Some(rows([2])));
1093
1094        let row_ids = applier_factory(
1095            vec![],
1096            vec![(2, vec![(true, "World")])],
1097            Some(BitVec::from_slice(&[0b1111])),
1098        )
1099        .await;
1100        assert_eq!(row_ids, Some(rows([2, 3])));
1101
1102        let row_ids = applier_factory(
1103            vec![],
1104            vec![(2, vec![(true, "World")])],
1105            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1106        )
1107        .await;
1108        assert_eq!(row_ids, Some(rows([3])));
1109    }
1110
1111    #[tokio::test]
1112    async fn test_fulltext_index_basic_chinese_tantivy() {
1113        let applier_factory = build_fulltext_applier_factory(
1114            "test_fulltext_index_basic_chinese_tantivy_",
1115            FulltextBackend::Tantivy,
1116            &[
1117                (None, None, Some("你好")),
1118                (None, None, None),
1119                (None, None, Some("世界")),
1120                (None, None, Some("你好,世界")),
1121            ],
1122        )
1123        .await;
1124
1125        let row_ids = applier_factory(vec![(3, "你好")], vec![], None).await;
1126        assert_eq!(row_ids, Some(rows([0, 3])));
1127
1128        let row_ids = applier_factory(vec![(3, "世界")], vec![], None).await;
1129        assert_eq!(row_ids, Some(rows([2, 3])));
1130
1131        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])], None).await;
1132        assert_eq!(row_ids, Some(rows([0, 3])));
1133
1134        let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])], None).await;
1135        assert_eq!(row_ids, Some(rows([2, 3])));
1136    }
1137
1138    #[tokio::test]
1139    async fn test_fulltext_index_basic_chinese_bloom() {
1140        let applier_factory = build_fulltext_applier_factory(
1141            "test_fulltext_index_basic_chinese_bloom_",
1142            FulltextBackend::Bloom,
1143            &[
1144                (None, None, Some("你好")),
1145                (None, None, None),
1146                (None, None, Some("世界")),
1147                (None, None, Some("你好,世界")),
1148            ],
1149        )
1150        .await;
1151
1152        let row_ids = applier_factory(
1153            vec![],
1154            vec![(3, vec![(false, "你好")])],
1155            Some(BitVec::from_slice(&[0b1111])),
1156        )
1157        .await;
1158        assert_eq!(row_ids, Some(rows([0, 3])));
1159
1160        let row_ids = applier_factory(
1161            vec![],
1162            vec![(3, vec![(false, "你好")])],
1163            Some(BitVec::from_slice(&[0b1110])), // row 0 is filtered out
1164        )
1165        .await;
1166        assert_eq!(row_ids, Some(rows([3])));
1167
1168        let row_ids = applier_factory(
1169            vec![],
1170            vec![(3, vec![(false, "世界")])],
1171            Some(BitVec::from_slice(&[0b1111])),
1172        )
1173        .await;
1174        assert_eq!(row_ids, Some(rows([2, 3])));
1175
1176        let row_ids = applier_factory(
1177            vec![],
1178            vec![(3, vec![(false, "世界")])],
1179            Some(BitVec::from_slice(&[0b1011])), // row 2 is filtered out
1180        )
1181        .await;
1182        assert_eq!(row_ids, Some(rows([3])));
1183    }
1184
1185    #[tokio::test]
1186    async fn test_fulltext_index_multi_terms_case_sensitive_tantivy() {
1187        let applier_factory = build_fulltext_applier_factory(
1188            "test_fulltext_index_multi_terms_case_sensitive_tantivy_",
1189            FulltextBackend::Tantivy,
1190            &[
1191                (Some("Hello"), None, None),
1192                (Some("World"), None, None),
1193                (None, None, None),
1194                (Some("Hello, World"), None, None),
1195            ],
1196        )
1197        .await;
1198
1199        let row_ids = applier_factory(
1200            vec![],
1201            vec![(1, vec![(false, "hello"), (false, "world")])],
1202            None,
1203        )
1204        .await;
1205        assert_eq!(row_ids, Some(rows([])));
1206
1207        let row_ids = applier_factory(
1208            vec![],
1209            vec![(1, vec![(false, "Hello"), (false, "World")])],
1210            None,
1211        )
1212        .await;
1213        assert_eq!(row_ids, Some(rows([3])));
1214
1215        let row_ids = applier_factory(
1216            vec![],
1217            vec![(1, vec![(true, "Hello"), (false, "World")])],
1218            None,
1219        )
1220        .await;
1221        assert_eq!(row_ids, Some(rows([1, 3])));
1222
1223        let row_ids = applier_factory(
1224            vec![],
1225            vec![(1, vec![(false, "Hello"), (true, "World")])],
1226            None,
1227        )
1228        .await;
1229        assert_eq!(row_ids, Some(rows([0, 3])));
1230
1231        let row_ids = applier_factory(
1232            vec![],
1233            vec![(1, vec![(true, "Hello"), (true, "World")])],
1234            None,
1235        )
1236        .await;
1237        assert_eq!(row_ids, None);
1238    }
1239
1240    #[tokio::test]
1241    async fn test_fulltext_index_multi_terms_case_sensitive_bloom() {
1242        let applier_factory = build_fulltext_applier_factory(
1243            "test_fulltext_index_multi_terms_case_sensitive_bloom_",
1244            FulltextBackend::Bloom,
1245            &[
1246                (Some("Hello"), None, None),
1247                (Some("World"), None, None),
1248                (None, None, None),
1249                (Some("Hello, World"), None, None),
1250            ],
1251        )
1252        .await;
1253
1254        let row_ids = applier_factory(
1255            vec![],
1256            vec![(1, vec![(false, "hello"), (false, "world")])],
1257            Some(BitVec::from_slice(&[0b1111])),
1258        )
1259        .await;
1260        assert_eq!(row_ids, Some(rows([])));
1261
1262        let row_ids = applier_factory(
1263            vec![],
1264            vec![(1, vec![(false, "Hello"), (false, "World")])],
1265            Some(BitVec::from_slice(&[0b1111])),
1266        )
1267        .await;
1268        assert_eq!(row_ids, Some(rows([3])));
1269
1270        let row_ids = applier_factory(
1271            vec![],
1272            vec![(1, vec![(true, "Hello"), (false, "World")])],
1273            Some(BitVec::from_slice(&[0b1111])),
1274        )
1275        .await;
1276        assert_eq!(row_ids, Some(rows([1, 3])));
1277
1278        let row_ids = applier_factory(
1279            vec![],
1280            vec![(1, vec![(false, "Hello"), (true, "World")])],
1281            Some(BitVec::from_slice(&[0b1111])),
1282        )
1283        .await;
1284        assert_eq!(row_ids, Some(rows([0, 3])));
1285
1286        let row_ids = applier_factory(
1287            vec![],
1288            vec![(1, vec![(true, "Hello"), (true, "World")])],
1289            Some(BitVec::from_slice(&[0b1111])),
1290        )
1291        .await;
1292        assert_eq!(row_ids, None);
1293    }
1294
1295    #[tokio::test]
1296    async fn test_fulltext_index_multi_terms_case_insensitive_tantivy() {
1297        let applier_factory = build_fulltext_applier_factory(
1298            "test_fulltext_index_multi_terms_case_insensitive_tantivy_",
1299            FulltextBackend::Tantivy,
1300            &[
1301                (None, Some("hello"), None),
1302                (None, None, None),
1303                (None, Some("world"), None),
1304                (None, Some("Hello, World"), None),
1305            ],
1306        )
1307        .await;
1308
1309        let row_ids = applier_factory(
1310            vec![],
1311            vec![(2, vec![(false, "hello"), (false, "world")])],
1312            None,
1313        )
1314        .await;
1315        assert_eq!(row_ids, Some(rows([3])));
1316
1317        let row_ids = applier_factory(
1318            vec![],
1319            vec![(2, vec![(true, "hello"), (false, "world")])],
1320            None,
1321        )
1322        .await;
1323        assert_eq!(row_ids, Some(rows([3])));
1324
1325        let row_ids = applier_factory(
1326            vec![],
1327            vec![(2, vec![(false, "hello"), (true, "world")])],
1328            None,
1329        )
1330        .await;
1331        assert_eq!(row_ids, Some(rows([3])));
1332
1333        let row_ids = applier_factory(
1334            vec![],
1335            vec![(2, vec![(true, "hello"), (true, "world")])],
1336            None,
1337        )
1338        .await;
1339        assert_eq!(row_ids, Some(rows([3])));
1340    }
1341
1342    #[tokio::test]
1343    async fn test_fulltext_index_multi_terms_case_insensitive_bloom() {
1344        let applier_factory = build_fulltext_applier_factory(
1345            "test_fulltext_index_multi_terms_case_insensitive_bloom_",
1346            FulltextBackend::Bloom,
1347            &[
1348                (None, Some("hello"), None),
1349                (None, None, None),
1350                (None, Some("world"), None),
1351                (None, Some("Hello, World"), None),
1352            ],
1353        )
1354        .await;
1355
1356        let row_ids = applier_factory(
1357            vec![],
1358            vec![(2, vec![(false, "hello"), (false, "world")])],
1359            Some(BitVec::from_slice(&[0b1111])),
1360        )
1361        .await;
1362        assert_eq!(row_ids, Some(rows([3])));
1363
1364        let row_ids = applier_factory(
1365            vec![],
1366            vec![(2, vec![(true, "hello"), (false, "world")])],
1367            Some(BitVec::from_slice(&[0b1111])),
1368        )
1369        .await;
1370        assert_eq!(row_ids, Some(rows([3])));
1371
1372        let row_ids = applier_factory(
1373            vec![],
1374            vec![(2, vec![(false, "hello"), (true, "world")])],
1375            Some(BitVec::from_slice(&[0b1111])),
1376        )
1377        .await;
1378        assert_eq!(row_ids, Some(rows([3])));
1379
1380        let row_ids = applier_factory(
1381            vec![],
1382            vec![(2, vec![(true, "hello"), (true, "world")])],
1383            Some(BitVec::from_slice(&[0b1111])),
1384        )
1385        .await;
1386        assert_eq!(row_ids, Some(rows([3])));
1387    }
1388
1389    #[tokio::test]
1390    async fn test_fulltext_index_multi_columns_tantivy() {
1391        let applier_factory = build_fulltext_applier_factory(
1392            "test_fulltext_index_multi_columns_tantivy_",
1393            FulltextBackend::Tantivy,
1394            &[
1395                (Some("Hello"), None, Some("你好")),
1396                (Some("World"), Some("world"), None),
1397                (None, Some("World"), Some("世界")),
1398                (
1399                    Some("Hello, World"),
1400                    Some("Hello, World"),
1401                    Some("你好,世界"),
1402                ),
1403            ],
1404        )
1405        .await;
1406
1407        let row_ids = applier_factory(
1408            vec![(1, "Hello"), (3, "你好")],
1409            vec![(2, vec![(false, "world")])],
1410            None,
1411        )
1412        .await;
1413        assert_eq!(row_ids, Some(rows([3])));
1414
1415        let row_ids =
1416            applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])], None).await;
1417        assert_eq!(row_ids, Some(rows([1, 3])));
1418    }
1419
1420    #[tokio::test]
1421    async fn test_fulltext_index_multi_columns_bloom() {
1422        let applier_factory = build_fulltext_applier_factory(
1423            "test_fulltext_index_multi_columns_bloom_",
1424            FulltextBackend::Bloom,
1425            &[
1426                (Some("Hello"), None, Some("你好")),
1427                (Some("World"), Some("world"), None),
1428                (None, Some("World"), Some("世界")),
1429                (
1430                    Some("Hello, World"),
1431                    Some("Hello, World"),
1432                    Some("你好,世界"),
1433                ),
1434            ],
1435        )
1436        .await;
1437
1438        let row_ids = applier_factory(
1439            vec![],
1440            vec![
1441                (1, vec![(false, "Hello")]),
1442                (2, vec![(false, "world")]),
1443                (3, vec![(false, "你好")]),
1444            ],
1445            Some(BitVec::from_slice(&[0b1111])),
1446        )
1447        .await;
1448        assert_eq!(row_ids, Some(rows([3])));
1449
1450        let row_ids = applier_factory(
1451            vec![],
1452            vec![(1, vec![(false, "World")]), (2, vec![(false, "World")])],
1453            Some(BitVec::from_slice(&[0b1111])),
1454        )
1455        .await;
1456        assert_eq!(row_ids, Some(rows([1, 3])));
1457    }
1458}